class PikaQue::Broker

Public Class Methods

new(processor = nil, opts = {}) click to toggle source
# File lib/pika_que/broker.rb, line 7
def initialize(processor = nil, opts = {})
  @opts = PikaQue.config.merge(opts)
  @processor = processor
  @handlers = {}
end

Public Instance Methods

channel() click to toggle source
# File lib/pika_que/broker.rb, line 64
def channel
  @channel ||= init_channel
end
cleanup(force = false) click to toggle source
# File lib/pika_que/broker.rb, line 74
def cleanup(force = false)
  if (@processor && force) || !@processor
    @channel.close unless @channel.closed?
    @channel = nil
    @exchange = nil
    if @default_handler
      @default_handler.close
      @default_handler = nil
    end
    @handlers.values.each(&:close)
  end
end
default_handler() click to toggle source
# File lib/pika_que/broker.rb, line 56
def default_handler
  @default_handler ||= @opts[:handler_class] ? PikaQue::Util.constantize(@opts[:handler_class]).new(@opts[:handler_options].merge({ connection: @connection })) : PikaQue::Handlers::DefaultHandler.new
end
exchange() click to toggle source
# File lib/pika_que/broker.rb, line 60
def exchange
  @exchange ||= channel.exchange(@opts[:exchange], @opts[:exchange_options])
end
handler(handler_class, handler_opts = {}) click to toggle source
# File lib/pika_que/broker.rb, line 42
def handler(handler_class, handler_opts = {})
  if handler_class
    h_key = "#{handler_class}-#{handler_opts.hash}"
    _handler = @handlers[h_key]
    unless _handler
      _handler = handler_class.new(handler_opts.merge({ connection: @connection }))
      @handlers[h_key] = _handler
    end
    _handler
  else
    default_handler
  end
end
init_channel() click to toggle source
# File lib/pika_que/broker.rb, line 68
def init_channel
  @connection.create_channel(nil, @opts[:channel_options][:consumer_pool_size]).tap do |ch|
    ch.prefetch(@opts[:channel_options][:prefetch])
  end
end
local_connection?() click to toggle source
# File lib/pika_que/broker.rb, line 22
def local_connection?
  @opts[:connection_options] || @processor.nil?
end
queue(queue_name, queue_opts = {}) click to toggle source
# File lib/pika_que/broker.rb, line 26
def queue(queue_name, queue_opts = {})
  begin
    queue = channel.queue(queue_name, queue_opts)
    routing_key = queue_opts[:routing_key] || queue_name
    routing_keys = [routing_key, *queue_opts[:routing_keys]]

    routing_keys.each do |key|
      queue.bind(exchange, routing_key: key)
    end
    queue
  rescue => e
    PikaQue.logger.fatal e.message
    raise SetupError.new e.message
  end
end
start() click to toggle source
# File lib/pika_que/broker.rb, line 13
def start
  @connection ||= @opts[:connection_options] ? PikaQue::Connection.create(@opts[:connection_options]) : PikaQue.connection
  @connection.ensure_connection
end
stop() click to toggle source
# File lib/pika_que/broker.rb, line 18
def stop
  @connection.disconnect! if local_connection?
end