class PikaQue::Subscriber

Attributes

broker[RW]
handler[RW]
pool[RW]
queue[RW]

Public Class Methods

new(opts = {}) click to toggle source
# File lib/pika_que/subscriber.rb, line 13
def initialize(opts = {})
  @opts = PikaQue.config.merge(opts)
  @codec = PikaQue::Util.constantize(@opts[:codec])
  @broker = @opts[:broker] || PikaQue::Broker.new(nil, @opts).tap{ |b| b.start }
  @pool = @opts[:worker_pool] || Concurrent::FixedThreadPool.new(@opts[:concurrency] || 1)
end

Public Instance Methods

handle_message(worker, delivery_info, metadata, msg) click to toggle source
# File lib/pika_que/subscriber.rb, line 40
def handle_message(worker, delivery_info, metadata, msg)
  res = nil
  error = nil
  begin
    decoded_msg = @codec.decode(msg)
    metrics.measure("work.#{worker.class.name}.time") do
      PikaQue.middleware.invoke(worker, delivery_info, metadata, decoded_msg) do
        res = worker.work(delivery_info, metadata, decoded_msg)
      end
    end
    logger.debug "done processing #{res} <#{msg}>"
  rescue => worker_err
    res = :error
    error = worker_err
    notify_reporters(worker_err, worker.class, msg)
  end

  if @opts[:ack]
    begin
      handler.handle(res, broker.channel, delivery_info, metadata, msg, error)
      metrics.increment("work.#{worker.class.name}.handled.#{res}") 
    rescue => handler_err
      notify_reporters(handler_err, handler.class, msg)
      metrics.increment("work.#{worker.class.name}.handler.error") 
    end
  else
    metrics.increment("work.#{worker.class.name}.handled.noop") 
  end
  metrics.increment("work.#{worker.class.name}.processed")
end
setup_handler(handler_class, handler_opts) click to toggle source
# File lib/pika_que/subscriber.rb, line 24
def setup_handler(handler_class, handler_opts)
  @handler = broker.handler(handler_class, @opts[:handler_options].merge(handler_opts || {}))
  # TODO use routing keys?
  logger.info "binding queue #{@queue.name} to handler #{@handler.class}"
  @handler.bind_queue(@queue, @queue.name)
end
setup_queue(queue_name, queue_opts) click to toggle source
# File lib/pika_que/subscriber.rb, line 20
def setup_queue(queue_name, queue_opts)
  @queue = broker.queue(queue_name, @opts[:queue_options].merge(queue_opts))
end
subscribe(worker) click to toggle source
# File lib/pika_que/subscriber.rb, line 31
def subscribe(worker)
  @consumer = queue.subscribe(:block => false, :manual_ack => @opts[:ack], :arguments => worker.consumer_arguments) do | delivery_info, metadata, msg |
    # TODO make idletime configurable on thread pool? default is 60.
    pool.post do
      handle_message(worker, delivery_info, metadata, msg)
    end
  end
end
teardown() click to toggle source
# File lib/pika_que/subscriber.rb, line 76
def teardown
  unless @opts[:worker_pool]
    @pool.shutdown
    @pool.wait_for_termination 12
  end
  broker.cleanup
  broker.stop
end
unsubscribe() click to toggle source
# File lib/pika_que/subscriber.rb, line 71
def unsubscribe
  @consumer.cancel if @consumer
  @consumer = nil
end