class Rack::App::Worker::Consumer
Bunny::Consumer
Public Class Methods
new(definition)
click to toggle source
# File lib/rack/app/worker/consumer.rb, line 5 def initialize(definition) @definition = definition @instance = @definition[:class].new @subscriptions = [] @shutdown_requested = false end
Public Instance Methods
start()
click to toggle source
# File lib/rack/app/worker/consumer.rb, line 12 def start daemonizer.spawn do |d| d.process_title("rack-app-worker/#{@definition[:name]}/#{d.id}") start_working end end
stop()
click to toggle source
# File lib/rack/app/worker/consumer.rb, line 19 def stop daemonizer.send_signal('HUP', 1) end
stop_all()
click to toggle source
# File lib/rack/app/worker/consumer.rb, line 23 def stop_all daemonizer.send_signal('HUP') end
Protected Instance Methods
at_shutdown()
click to toggle source
# File lib/rack/app/worker/consumer.rb, line 50 def at_shutdown logger.info 'cancel subscriptions' @subscriptions.each { |c| c.cancel } @shutdown_requested = true end
daemonizer()
click to toggle source
# File lib/rack/app/worker/consumer.rb, line 56 def daemonizer @daemonizer ||= proc { daemonizer_instance = Rack::App::Worker::Daemonizer.new(@definition[:name]) daemonizer_instance.on_shutdown { at_shutdown } daemonizer_instance.on_halt { at_shutdown } daemonizer_instance }.call end
handle_message(queue, delivery_info, properties, payload)
click to toggle source
# File lib/rack/app/worker/consumer.rb, line 41 def handle_message(queue, delivery_info, properties, payload) method_name = properties[:headers]['method_name'] args = YAML.load(payload) @instance.public_send(method_name, *args) queue.channel.ack(delivery_info.delivery_tag, false) rescue Exception queue.channel.nack(delivery_info.delivery_tag, false, true) end
logger()
click to toggle source
# File lib/rack/app/worker/consumer.rb, line 72 def logger @logger ||= Rack::App::Worker::Logger.new end
start_working()
click to toggle source
# File lib/rack/app/worker/consumer.rb, line 29 def start_working logger.info "consumer start working for #{@definition[:name]}" rabbit = Rack::App::Worker::RabbitMQ.new subscribe(rabbit.send_queue(@definition[:name])) subscribe(rabbit.create_broadcast_queue(@definition[:name])) wait_for_shutdown end
subscribe(queue)
click to toggle source
# File lib/rack/app/worker/consumer.rb, line 65 def subscribe(queue) logger.info "creating subscription for #{queue.name}" @subscriptions << queue.subscribe(:manual_ack => true) do |delivery_info, properties, payload| handle_message(queue, delivery_info, properties, payload) end end
wait_for_shutdown()
click to toggle source
# File lib/rack/app/worker/consumer.rb, line 37 def wait_for_shutdown sleep(1) until @shutdown_requested end