class PikaQue::Handlers::ErrorHandler
Constants
- DEFAULT_ERROR_OPTS
Public Class Methods
new(opts = {})
click to toggle source
# File lib/pika_que/handlers/error_handler.rb, line 9 def initialize(opts = {}) @opts = PikaQue.config.merge(DEFAULT_ERROR_OPTS).merge(opts) @connection = @opts[:connection] || PikaQue.connection @channel = @connection.create_channel error_ex_name = error_q_name = "#{@opts[:error_prefix]}-error" if @opts[:queue] # handle deprecated options error_ex_name = @opts[:exchange] error_q_name = @opts[:queue] end @exchange = @channel.exchange(error_ex_name, type: :topic, durable: exchange_durable?) @queue = @channel.queue(error_q_name, durable: queue_durable?) @queue.bind(@exchange, routing_key: '#') @monitor = Monitor.new end
Public Instance Methods
bind_queue(queue, routing_key)
click to toggle source
# File lib/pika_que/handlers/error_handler.rb, line 25 def bind_queue(queue, routing_key) end
close()
click to toggle source
# File lib/pika_que/handlers/error_handler.rb, line 46 def close @channel.close unless @channel.closed? end
handle(response_code, channel, delivery_info, metadata, msg, error = nil)
click to toggle source
# File lib/pika_que/handlers/error_handler.rb, line 28 def handle(response_code, channel, delivery_info, metadata, msg, error = nil) case response_code when :ack PikaQue.logger.debug "ErrorHandler acknowledge <#{msg}>" channel.acknowledge(delivery_info.delivery_tag, false) when :reject PikaQue.logger.debug "ErrorHandler reject <#{msg}>" channel.reject(delivery_info.delivery_tag, false) when :requeue PikaQue.logger.debug "ErrorHandler requeue <#{msg}>" channel.reject(delivery_info.delivery_tag, true) else PikaQue.logger.debug "ErrorHandler publishing <#{msg}> to [#{@queue.name}]" publish(delivery_info, msg) channel.acknowledge(delivery_info.delivery_tag, false) end end
Private Instance Methods
exchange_durable?()
click to toggle source
# File lib/pika_que/handlers/error_handler.rb, line 56 def exchange_durable? @opts.fetch(:exchange_options, {}).fetch(:durable, false) end
publish(delivery_info, msg)
click to toggle source
# File lib/pika_que/handlers/error_handler.rb, line 60 def publish(delivery_info, msg) @monitor.synchronize do @exchange.publish(msg, routing_key: delivery_info.routing_key) end end
queue_durable?()
click to toggle source
# File lib/pika_que/handlers/error_handler.rb, line 52 def queue_durable? @opts.fetch(:queue_options, {}).fetch(:durable, false) end