class Drnbench::PublishSubscribe::Runner

Public Class Methods

new(config) click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 24
def initialize(config)
  @config = config
  @published_messages = Queue.new
end

Public Instance Methods

increase_subscribers() click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 51
def increase_subscribers
  if @subscribers.empty?
    new_n_subscribers = @config.start_n_subscribers
  else
    new_n_subscribers = @subscribers.size
  end
  add_subscribers(new_n_subscribers)
  ensure_subscribers_ready
end
n_subscribers() click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 29
def n_subscribers
  @subscribers.size
end
run() click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 43
def run
  publishing_times = @config.n_publishings
  n_will_be_published_messages = @subscribers.size * publishing_times

  do_feed(publishing_times)
  receive_messages(n_will_be_published_messages)
end
setup() click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 33
def setup
  setup_server
  setup_subscribers
end
teardown() click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 38
def teardown
  teardown_subscribers
  teardown_server
end

Private Instance Methods

add_subscribers(n_subscribers) click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 85
def add_subscribers(n_subscribers)
  n_subscribers.times do
    message = @config.new_subscribe_request
    client = Droonga::Client.new(:protocol => :http,
                                 :host => @config.protocol_adapter.host,
                                 :port => @config.protocol_adapter.port)
    client.subscribe(message) do |published_message|
      @published_messages.push(published_message)
    end
    @subscribers << client
  end
end
do_feed(count) click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 110
def do_feed(count)
  Droonga::Client.open(:tag => @config.engine.tag,
                       :host => @config.engine.host,
                       :port => @config.engine.port) do |feeder|
    count.times do
      do_one_feed(feeder)
    end
  end
end
do_one_feed(feeder) click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 120
def do_one_feed(feeder)
  message = @config.new_feed
  feeder.send(message)
end
ensure_subscribers_ready() click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 98
def ensure_subscribers_ready
  sleep(1)
  2.times do
    do_feed(1)
    n_subscribers.times do
      @published_messages.pop
      break if @published_messages.empty?
    end
  end
  @published_messages.clear
end
receive_messages(count) click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 125
def receive_messages(count)
  n_published_messages = 0
  count.times do
    # we should implement "timeout" for too slow cases
    @published_messages.pop
    n_published_messages += 1
  end
  n_published_messages
end
setup_server() click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 62
def setup_server
  @engine = Engine.new(@config.engine)
  @engine.start

  @protocol_adapter = ProtocolAdapter.new(@config.protocol_adapter)
  @protocol_adapter.start
end
setup_subscribers() click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 75
def setup_subscribers
  @subscribers = []
end
teardown_server() click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 70
def teardown_server
  @protocol_adapter.stop
  @engine.stop
end
teardown_subscribers() click to toggle source
# File lib/drnbench/publish-subscribe/runner.rb, line 79
def teardown_subscribers
  @subscribers.each do |subscriber|
    subscriber.close
  end
end