class Sneakers::Queue

Attributes

channel[R]
exchange[R]
name[R]
opts[R]

Public Class Methods

new(name, opts) click to toggle source
# File lib/sneakers/queue.rb, line 5
def initialize(name, opts)
  @name = name
  @opts = opts
  @handler_klass = Sneakers::CONFIG[:handler]
end

Public Instance Methods

subscribe(worker) click to toggle source

:exchange :heartbeat_interval :prefetch :durable :ack

# File lib/sneakers/queue.rb, line 18
def subscribe(worker)
  # If we've already got a bunny object, use it.  This allows people to
  # specify all kinds of options we don't need to know about (e.g. for ssl).
  @bunny = @opts[:connection]
  @bunny ||= create_bunny_connection
  @bunny.start

  @channel = @bunny.create_channel
  @channel.prefetch(@opts[:prefetch])

  exchange_name = @opts[:exchange]
  @exchange = @channel.exchange(exchange_name, @opts[:exchange_options])

  routing_key = @opts[:routing_key] || @name
  routing_keys = [*routing_key]

  handler_klass = worker.opts[:handler] || Sneakers::CONFIG.fetch(:handler)
  # Configure options if needed
  if handler_klass.respond_to?(:configure_queue)
    @opts[:queue_options] = handler_klass.configure_queue(@name, @opts[:queue_options])
  end

  queue = @channel.queue(@name, @opts[:queue_options])

  if exchange_name.length > 0
    routing_keys.each do |key|
      if @opts[:bind_arguments]
        queue.bind(@exchange, routing_key: key, arguments: @opts[:bind_arguments])
      else
        queue.bind(@exchange, routing_key: key)
      end
    end
  end

  # NOTE: we are using the worker's options. This is necessary so the handler
  # has the same configuration as the worker. Also pass along the exchange and
  # queue in case the handler requires access to them (for things like binding
  # retry queues, etc).
  handler = handler_klass.new(@channel, queue, worker.opts)

  @consumer = queue.subscribe(block: false, manual_ack: @opts[:ack]) do | delivery_info, metadata, msg |
    worker.do_work(delivery_info, metadata, msg, handler)
  end
  nil
end
unsubscribe() click to toggle source
# File lib/sneakers/queue.rb, line 64
def unsubscribe
  return unless @consumer

  # TODO: should we simply close the channel here?
  Sneakers.logger.info("Queue: will try to cancel consumer #{@consumer.inspect}")
  cancel_ok = @consumer.cancel
  if cancel_ok
    Sneakers.logger.info "Queue: consumer #{cancel_ok.consumer_tag} cancelled"
    @consumer = nil
  else
    Sneakers.logger.warn "Queue: could not cancel consumer #{@consumer.inspect}"
    sleep(1)
    unsubscribe
  end
end

Private Instance Methods

create_bunny_connection() click to toggle source
# File lib/sneakers/queue.rb, line 80
def create_bunny_connection
  Bunny.new(@opts[:amqp], vhost: @opts[:vhost],
                          heartbeat: @opts[:heartbeat],
                          properties: @opts.fetch(:properties, {}),
                          logger: Sneakers::logger)
end