class Rack::App::Worker::Observer

Public Class Methods

new() click to toggle source
# File lib/rack/app/worker/observer.rb, line 5
def initialize
  @shutdown_signal_received = false
  @ready_for_shutdown = false
end

Public Instance Methods

start() click to toggle source
# File lib/rack/app/worker/observer.rb, line 10
def start
  logger.info(__method__.to_s)
  loop do
    break if shutdown_signal_received

    logger.debug(Rack::App::Worker::Register.worker_definitions.keys.inspect)
    Rack::App::Worker::Register.worker_definitions.values.each do |definition|

      queue = rabbitmq.send_queue(definition[:name])
      status = check_status(queue)

      if need_more_worker?(status)
        create_consumer(definition)
      end

      if need_less_worker?(status)
        signal_shutdown_for_a_consumer(definition)
      end

    end

    sleep(heartbeat_interval)

  end
end
stop() click to toggle source
# File lib/rack/app/worker/observer.rb, line 36
def stop
  @shutdown_signal_received = true
  sleep(0.1) until @ready_for_shutdown
end

Protected Instance Methods

check_status(queue) click to toggle source
# File lib/rack/app/worker/observer.rb, line 43
def check_status(queue)
  queue.status
rescue Timeout::Error
  retry
end
consumers() click to toggle source
# File lib/rack/app/worker/observer.rb, line 86
def consumers
  Rack::App::Worker::Register.worker_definitions.values.map do |definition|
    Rack::App::Worker::Consumer.new(definition)
  end
end
create_consumer(definition) click to toggle source
# File lib/rack/app/worker/observer.rb, line 49
def create_consumer(definition)
  logger.info("#{__method__}(#{definition[:name]})")
  Rack::App::Worker::Consumer.new(definition).start
end
heartbeat_interval() click to toggle source
# File lib/rack/app/worker/observer.rb, line 59
def heartbeat_interval
  Rack::App::Worker::Environment.heartbeat_interval
end
logger() click to toggle source
# File lib/rack/app/worker/observer.rb, line 96
def logger
  @logger ||= Rack::App::Worker::Logger.new
end
message_count_limit() click to toggle source
# File lib/rack/app/worker/observer.rb, line 63
def message_count_limit
  Rack::App::Worker::Environment.message_count_limit
end
need_less_worker?(status) click to toggle source
# File lib/rack/app/worker/observer.rb, line 77
def need_less_worker?(status)
  (status[:message_count] < message_count_limit) and (status[:consumer_count] > 1)
end
need_more_worker?(status) click to toggle source
# File lib/rack/app/worker/observer.rb, line 81
def need_more_worker?(status)
  ((status[:consumer_count] == 0) or (status[:message_count] >= message_count_limit)) and
      status[:consumer_count] <= Rack::App::Worker::Environment.max_consumer_number
end
rabbitmq() click to toggle source
# File lib/rack/app/worker/observer.rb, line 92
def rabbitmq
  @rabbitmq ||= Rack::App::Worker::RabbitMQ.new
end
shutdown_signal_received() click to toggle source
# File lib/rack/app/worker/observer.rb, line 67
def shutdown_signal_received
  if @shutdown_signal_received
    logger.info(__method__.to_s)
    consumers.each { |c| c.stop_all }
    rabbitmq.session.close
    @ready_for_shutdown = true
  end
  (!!@ready_for_shutdown)
end
signal_shutdown_for_a_consumer(definition) click to toggle source
# File lib/rack/app/worker/observer.rb, line 54
def signal_shutdown_for_a_consumer(definition)
  logger.info("#{__method__}(#{definition[:name]})")
  Rack::App::Worker::Consumer.new(definition).stop
end