class Drnbench::PublishSubscribe::Runner
Public Class Methods
new(config)
click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 24 def initialize(config) @config = config @published_messages = Queue.new end
Public Instance Methods
increase_subscribers()
click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 51 def increase_subscribers if @subscribers.empty? new_n_subscribers = @config.start_n_subscribers else new_n_subscribers = @subscribers.size end add_subscribers(new_n_subscribers) ensure_subscribers_ready end
n_subscribers()
click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 29 def n_subscribers @subscribers.size end
run()
click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 43 def run publishing_times = @config.n_publishings n_will_be_published_messages = @subscribers.size * publishing_times do_feed(publishing_times) receive_messages(n_will_be_published_messages) end
setup()
click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 33 def setup setup_server setup_subscribers end
teardown()
click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 38 def teardown teardown_subscribers teardown_server end
Private Instance Methods
add_subscribers(n_subscribers)
click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 85 def add_subscribers(n_subscribers) n_subscribers.times do message = @config.new_subscribe_request client = Droonga::Client.new(:protocol => :http, :host => @config.protocol_adapter.host, :port => @config.protocol_adapter.port) client.subscribe(message) do |published_message| @published_messages.push(published_message) end @subscribers << client end end
do_feed(count)
click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 110 def do_feed(count) Droonga::Client.open(:tag => @config.engine.tag, :host => @config.engine.host, :port => @config.engine.port) do |feeder| count.times do do_one_feed(feeder) end end end
do_one_feed(feeder)
click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 120 def do_one_feed(feeder) message = @config.new_feed feeder.send(message) end
ensure_subscribers_ready()
click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 98 def ensure_subscribers_ready sleep(1) 2.times do do_feed(1) n_subscribers.times do @published_messages.pop break if @published_messages.empty? end end @published_messages.clear end
receive_messages(count)
click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 125 def receive_messages(count) n_published_messages = 0 count.times do # we should implement "timeout" for too slow cases @published_messages.pop n_published_messages += 1 end n_published_messages end
setup_server()
click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 62 def setup_server @engine = Engine.new(@config.engine) @engine.start @protocol_adapter = ProtocolAdapter.new(@config.protocol_adapter) @protocol_adapter.start end
setup_subscribers()
click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 75 def setup_subscribers @subscribers = [] end
teardown_server()
click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 70 def teardown_server @protocol_adapter.stop @engine.stop end
teardown_subscribers()
click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 79 def teardown_subscribers @subscribers.each do |subscriber| subscriber.close end end