class Banter::Server::RabbitMQSubscriber
Attributes
channel[R]
exchange[R]
listener[R]
pool_size[R]
routing_key[R]
topic[R]
ttl[R]
Public Class Methods
new(config)
click to toggle source
# File lib/banter/server/rabbit_mq_subscriber.rb, line 15 def initialize(config) @topic = config.topic @exchange_name = config.exchange @dead_letter_exchange_name = config.dead_letter_exchange @initial_key = config.routing_key @queue_name = config.queue_name @ttl = config.ttl @durable = config.durable @pool_size = config.pool_size || 0 if @topic.blank? @routing_key = "#" elsif @initial_key.present? @routing_key = "#{@topic}.#{@initial_key}.#" elsif @topic.present? @routing_key = "#{@topic}.#" end end
Public Instance Methods
environment()
click to toggle source
# File lib/banter/server/rabbit_mq_subscriber.rb, line 110 def environment ENV['RAILS_ENV'] || ENV['RACK_ENV'] || 'development' end
notify_error(error, consumer)
click to toggle source
# File lib/banter/server/rabbit_mq_subscriber.rb, line 95 def notify_error(error, consumer) Banter::RabbitLogger.log(Logger::WARN, "Error with the message: #{error.message}: #{consumer}") Banter::RabbitLogger.log(Logger::WARN, "#{caller.inspect}") ::Banter::Notifier.notify(error, parameters: { consumer: consumer, error: error.message }, environment_name: environment) end
process_message(delivery_info, properties, contents)
click to toggle source
# File lib/banter/server/rabbit_mq_subscriber.rb, line 64 def process_message(delivery_info, properties, contents) token = delivery_info.delivery_tag begin envelope = ::Banter::Message.new.parse(contents) Banter::RabbitLogger.log(Logger::DEBUG, "Message delivery with contents: #{token}: #{delivery_info[:routing_key]}, #{contents}, #{contents.encoding}") if delivery_info[:redelivered] e = StandardError.new("PubSub Message redelivery") ::Banter::Notifier.notify(e, parameters: { info: delivery_info, props: properties, contents: contents }, backtrace: caller) end Banter::RabbitLogger.log_receive(delivery_info[:routing_key], envelope) @callback_block.call( delivery_info, properties, envelope) Banter::RabbitLogger.log_complete(delivery_info[:routing_key], envelope) # Need to acknowledge the message for the next message to come down. Banter::RabbitLogger.log(Logger::DEBUG, "Message acknowledged with tag #{token}") @channel.ack(token) rescue => e Banter::RabbitLogger.log(Logger::WARN, "Error in message: #{e}") e.backtrace.each{|line| Banter::RabbitLogger.log(Logger::WARN, "-- #{line}") } Banter::RabbitLogger.log(Logger::WARN, "contents: #{contents}") Banter::RabbitLogger.log_subscriber_failed(delivery_info[:routing_key], delivery_info, properties, envelope, contents) # Does not get put back on the queue, and instead, will need to be processed either by the log parser # later or by dead letter exchange @channel.reject(token, false) ::Banter::Notifier.notify(e, parameters: { delivery_info: delivery_info, properties: properties, contents: contents, error: e.message }, environment_name: environment) end end
start(&block)
click to toggle source
# File lib/banter/server/rabbit_mq_subscriber.rb, line 33 def start(&block) @connection = Bunny.new(Configuration.connection) begin @connection.start rescue => e Banter::RabbitLogger.log(Logger::ERROR, "Cannot connect to rabbitmq") ::Banter::Notifier.notify(e, parameters: { message: e.message, what_happened: "RabbitMQ unreachable!" }, environment_name: environment) raise e end @channel = @connection.create_channel @exchange = @channel.topic(@exchange_name, :durable => @durable, :auto_delete => false) queue_arguments = {} queue_arguments["x-dead-letter-exchange"] = @dead_letter_exchange_name if @dead_letter_exchange_name.present? queue_arguments["x-message-ttl"] = ttl * 1000 if ttl > 0 @channel.basic_qos(@pool_size) if @pool_size != 0 @channel.on_uncaught_exception(&method(:notify_error)) rabbit_queue = @channel.queue(@queue_name, durable: @durable, exclusive: false, arguments: queue_arguments) @listener = rabbit_queue.bind(@exchange, routing_key: @routing_key, exclusive: false) @callback_block = block @consumer = @listener.subscribe({ consumer_tag: @queue_name, manual_ack: true, block: false}, &method(:process_message)) nil end
teardown()
click to toggle source
# File lib/banter/server/rabbit_mq_subscriber.rb, line 101 def teardown begin @consumer.cancel if @consumer.present? @connection.close if @connection.present? rescue => e ::Banter::Notifier.notify(e, parameters: { error: e.message }, environment_name: environment) end end