class Banter::Server::RabbitMQSubscriber

Attributes

channel[R]
exchange[R]
listener[R]
pool_size[R]
routing_key[R]
topic[R]
ttl[R]

Public Class Methods

new(config) click to toggle source
# File lib/banter/server/rabbit_mq_subscriber.rb, line 15
def initialize(config)
  @topic       = config.topic
  @exchange_name = config.exchange
  @dead_letter_exchange_name = config.dead_letter_exchange
  @initial_key = config.routing_key
  @queue_name  = config.queue_name
  @ttl         = config.ttl
  @durable     = config.durable
  @pool_size   = config.pool_size || 0
  if @topic.blank?
    @routing_key = "#"
  elsif @initial_key.present?
    @routing_key = "#{@topic}.#{@initial_key}.#"
  elsif @topic.present?
    @routing_key = "#{@topic}.#"
  end
end

Public Instance Methods

environment() click to toggle source
# File lib/banter/server/rabbit_mq_subscriber.rb, line 110
def environment
  ENV['RAILS_ENV'] || ENV['RACK_ENV'] || 'development'
end
notify_error(error, consumer) click to toggle source
# File lib/banter/server/rabbit_mq_subscriber.rb, line 95
def notify_error(error, consumer)
  Banter::RabbitLogger.log(Logger::WARN, "Error with the message: #{error.message}: #{consumer}")
  Banter::RabbitLogger.log(Logger::WARN, "#{caller.inspect}")
  ::Banter::Notifier.notify(error, parameters: { consumer: consumer, error: error.message }, environment_name: environment)
end
process_message(delivery_info, properties, contents) click to toggle source
# File lib/banter/server/rabbit_mq_subscriber.rb, line 64
def process_message(delivery_info, properties, contents)
  token = delivery_info.delivery_tag
  begin
    envelope = ::Banter::Message.new.parse(contents)
    Banter::RabbitLogger.log(Logger::DEBUG, "Message delivery with contents: #{token}: #{delivery_info[:routing_key]}, #{contents}, #{contents.encoding}")

    if delivery_info[:redelivered]
      e = StandardError.new("PubSub Message redelivery")
      ::Banter::Notifier.notify(e, parameters: { info: delivery_info, props: properties, contents: contents }, backtrace: caller)
    end

    Banter::RabbitLogger.log_receive(delivery_info[:routing_key], envelope)
    @callback_block.call( delivery_info, properties, envelope)
    Banter::RabbitLogger.log_complete(delivery_info[:routing_key], envelope)

    # Need to acknowledge the message for the next message to come down.
    Banter::RabbitLogger.log(Logger::DEBUG, "Message acknowledged with tag #{token}")
    @channel.ack(token)
  rescue => e
    Banter::RabbitLogger.log(Logger::WARN, "Error in message: #{e}")
    e.backtrace.each{|line| Banter::RabbitLogger.log(Logger::WARN, "-- #{line}") }
    Banter::RabbitLogger.log(Logger::WARN, "contents: #{contents}")
    Banter::RabbitLogger.log_subscriber_failed(delivery_info[:routing_key], delivery_info, properties, envelope, contents)

    # Does not get put back on the queue, and instead, will need to be processed either by the log parser
    # later or by dead letter exchange
    @channel.reject(token, false)
    ::Banter::Notifier.notify(e, parameters: { delivery_info: delivery_info, properties: properties, contents: contents, error: e.message }, environment_name: environment)
  end
end
start(&block) click to toggle source
# File lib/banter/server/rabbit_mq_subscriber.rb, line 33
def start(&block)
  @connection = Bunny.new(Configuration.connection)
  begin
    @connection.start
  rescue => e
    Banter::RabbitLogger.log(Logger::ERROR, "Cannot connect to rabbitmq")
    ::Banter::Notifier.notify(e, parameters: { message: e.message, what_happened: "RabbitMQ unreachable!" }, environment_name: environment)
    raise e
  end

  @channel  = @connection.create_channel

  @exchange = @channel.topic(@exchange_name, :durable => @durable, :auto_delete => false)

  queue_arguments                           = {}

  queue_arguments["x-dead-letter-exchange"] = @dead_letter_exchange_name if @dead_letter_exchange_name.present?
  queue_arguments["x-message-ttl"]          = ttl * 1000 if ttl > 0

  @channel.basic_qos(@pool_size) if @pool_size != 0

  @channel.on_uncaught_exception(&method(:notify_error))

  rabbit_queue = @channel.queue(@queue_name, durable: @durable, exclusive: false, arguments: queue_arguments)
  @listener    = rabbit_queue.bind(@exchange, routing_key: @routing_key, exclusive: false)

  @callback_block = block
  @consumer    = @listener.subscribe({ consumer_tag: @queue_name,  manual_ack: true, block: false}, &method(:process_message))
  nil
end
teardown() click to toggle source
# File lib/banter/server/rabbit_mq_subscriber.rb, line 101
def teardown
  begin
    @consumer.cancel if @consumer.present?
    @connection.close if @connection.present?
  rescue => e
    ::Banter::Notifier.notify(e, parameters: { error: e.message }, environment_name: environment)
  end
end