class Rack::App::Worker::Consumer

Bunny::Consumer

Public Class Methods

new(definition) click to toggle source
# File lib/rack/app/worker/consumer.rb, line 5
def initialize(definition)
  @definition = definition
  @instance = @definition[:class].new
  @subscriptions = []
  @shutdown_requested = false
end

Public Instance Methods

start() click to toggle source
# File lib/rack/app/worker/consumer.rb, line 12
def start
  daemonizer.spawn do |d|
    d.process_title("rack-app-worker/#{@definition[:name]}/#{d.id}")
    start_working
  end
end
stop() click to toggle source
# File lib/rack/app/worker/consumer.rb, line 19
def stop
  daemonizer.send_signal('HUP', 1)
end
stop_all() click to toggle source
# File lib/rack/app/worker/consumer.rb, line 23
def stop_all
  daemonizer.send_signal('HUP')
end

Protected Instance Methods

at_shutdown() click to toggle source
# File lib/rack/app/worker/consumer.rb, line 50
def at_shutdown
  logger.info 'cancel subscriptions'
  @subscriptions.each { |c| c.cancel }
  @shutdown_requested = true
end
daemonizer() click to toggle source
# File lib/rack/app/worker/consumer.rb, line 56
def daemonizer
  @daemonizer ||= proc {
    daemonizer_instance = Rack::App::Worker::Daemonizer.new(@definition[:name])
    daemonizer_instance.on_shutdown { at_shutdown }
    daemonizer_instance.on_halt { at_shutdown }
    daemonizer_instance
  }.call
end
handle_message(queue, delivery_info, properties, payload) click to toggle source
# File lib/rack/app/worker/consumer.rb, line 41
def handle_message(queue, delivery_info, properties, payload)
  method_name = properties[:headers]['method_name']
  args = YAML.load(payload)
  @instance.public_send(method_name, *args)
  queue.channel.ack(delivery_info.delivery_tag, false)
rescue Exception
  queue.channel.nack(delivery_info.delivery_tag, false, true)
end
logger() click to toggle source
# File lib/rack/app/worker/consumer.rb, line 72
def logger
  @logger ||= Rack::App::Worker::Logger.new
end
start_working() click to toggle source
# File lib/rack/app/worker/consumer.rb, line 29
def start_working
  logger.info "consumer start working for #{@definition[:name]}"
  rabbit = Rack::App::Worker::RabbitMQ.new
  subscribe(rabbit.send_queue(@definition[:name]))
  subscribe(rabbit.create_broadcast_queue(@definition[:name]))
  wait_for_shutdown
end
subscribe(queue) click to toggle source
# File lib/rack/app/worker/consumer.rb, line 65
def subscribe(queue)
  logger.info "creating subscription for #{queue.name}"
  @subscriptions << queue.subscribe(:manual_ack => true) do |delivery_info, properties, payload|
    handle_message(queue, delivery_info, properties, payload)
  end
end
wait_for_shutdown() click to toggle source
# File lib/rack/app/worker/consumer.rb, line 37
def wait_for_shutdown
  sleep(1) until @shutdown_requested
end