class Rack::App::Worker::Observer
Public Class Methods
new()
click to toggle source
# File lib/rack/app/worker/observer.rb, line 5 def initialize @shutdown_signal_received = false @ready_for_shutdown = false end
Public Instance Methods
start()
click to toggle source
# File lib/rack/app/worker/observer.rb, line 10 def start logger.info(__method__.to_s) loop do break if shutdown_signal_received logger.debug(Rack::App::Worker::Register.worker_definitions.keys.inspect) Rack::App::Worker::Register.worker_definitions.values.each do |definition| queue = rabbitmq.send_queue(definition[:name]) status = check_status(queue) if need_more_worker?(status) create_consumer(definition) end if need_less_worker?(status) signal_shutdown_for_a_consumer(definition) end end sleep(heartbeat_interval) end end
stop()
click to toggle source
# File lib/rack/app/worker/observer.rb, line 36 def stop @shutdown_signal_received = true sleep(0.1) until @ready_for_shutdown end
Protected Instance Methods
check_status(queue)
click to toggle source
# File lib/rack/app/worker/observer.rb, line 43 def check_status(queue) queue.status rescue Timeout::Error retry end
consumers()
click to toggle source
# File lib/rack/app/worker/observer.rb, line 86 def consumers Rack::App::Worker::Register.worker_definitions.values.map do |definition| Rack::App::Worker::Consumer.new(definition) end end
create_consumer(definition)
click to toggle source
# File lib/rack/app/worker/observer.rb, line 49 def create_consumer(definition) logger.info("#{__method__}(#{definition[:name]})") Rack::App::Worker::Consumer.new(definition).start end
heartbeat_interval()
click to toggle source
# File lib/rack/app/worker/observer.rb, line 59 def heartbeat_interval Rack::App::Worker::Environment.heartbeat_interval end
logger()
click to toggle source
# File lib/rack/app/worker/observer.rb, line 96 def logger @logger ||= Rack::App::Worker::Logger.new end
message_count_limit()
click to toggle source
# File lib/rack/app/worker/observer.rb, line 63 def message_count_limit Rack::App::Worker::Environment.message_count_limit end
need_less_worker?(status)
click to toggle source
# File lib/rack/app/worker/observer.rb, line 77 def need_less_worker?(status) (status[:message_count] < message_count_limit) and (status[:consumer_count] > 1) end
need_more_worker?(status)
click to toggle source
# File lib/rack/app/worker/observer.rb, line 81 def need_more_worker?(status) ((status[:consumer_count] == 0) or (status[:message_count] >= message_count_limit)) and status[:consumer_count] <= Rack::App::Worker::Environment.max_consumer_number end
rabbitmq()
click to toggle source
# File lib/rack/app/worker/observer.rb, line 92 def rabbitmq @rabbitmq ||= Rack::App::Worker::RabbitMQ.new end
shutdown_signal_received()
click to toggle source
# File lib/rack/app/worker/observer.rb, line 67 def shutdown_signal_received if @shutdown_signal_received logger.info(__method__.to_s) consumers.each { |c| c.stop_all } rabbitmq.session.close @ready_for_shutdown = true end (!!@ready_for_shutdown) end
signal_shutdown_for_a_consumer(definition)
click to toggle source
# File lib/rack/app/worker/observer.rb, line 54 def signal_shutdown_for_a_consumer(definition) logger.info("#{__method__}(#{definition[:name]})") Rack::App::Worker::Consumer.new(definition).stop end