class Watership::Consumer

Public Class Methods

new(consumer, url, channel_options = {}, queue_options = {}) click to toggle source
# File lib/watership/consumer.rb, line 7
def initialize(consumer, url, channel_options = {}, queue_options = {})
  @consumer = consumer
  @url = url
  @prefetch = channel_options.delete(:prefetch) || Integer(ENV.fetch("RABBIT_CONSUMER_PREFETCH", 200))
  @concurrency = channel_options.delete(:concurrency) || 1
  @forever = channel_options.delete(:forever) || true
  @channel_opts = { durable: true }.merge(channel_options)
  @queue_opts = { block: false, manual_ack: true }.merge(queue_options)
end
sleep_forever() click to toggle source
# File lib/watership/consumer.rb, line 102
def self.sleep_forever
  sleepy_thread = Thread.new { sleep }
  Signal.trap("TERM") do
    sleepy_thread.terminate
  end
  sleepy_thread.join
rescue Interrupt
end

Public Instance Methods

ack_message(tag) click to toggle source
# File lib/watership/consumer.rb, line 57
def ack_message(tag)
  logger.info "Acking message"
  channel.acknowledge(tag, false)
end
bind(name, opts = {}) click to toggle source
# File lib/watership/consumer.rb, line 53
def bind(name, opts = {})
  create_queue.bind(name, opts)
end
channel() click to toggle source
# File lib/watership/consumer.rb, line 75
def channel
  @channel ||= begin
    created_channel = connection.create_channel(nil, @concurrency)
    created_channel.prefetch(@prefetch)
    created_channel
  end
end
clear_active_record_connections() click to toggle source
# File lib/watership/consumer.rb, line 96
def clear_active_record_connections
  if defined?(::ActiveRecord::Base) && ::ActiveRecord::Base.respond_to?(:clear_active_connections!)
    ::ActiveRecord::Base.clear_active_connections!
  end
end
connection() click to toggle source
# File lib/watership/consumer.rb, line 71
def connection
  @connection ||= Bunny.new(@url).tap { |bunny| bunny.start }
end
consume() click to toggle source
# File lib/watership/consumer.rb, line 17
def consume
  queue = create_queue
  queue.subscribe(@queue_opts.dup) do |delivery_info, properties, payload|
    begin
      data = JSON.parse(payload)
      @consumer.new.call(data)
      success = true
    rescue StandardError => exception
      logger.error "Error thrown in subscribe block"
      logger.error exception.message
      logger.error exception.backtrace.join("\n")

      retries = data["retries"] || 0
      notify(exception, { payload: data, retries: retries })
      enqueue(data.merge(retries: (retries + 1)))
      success = true
    rescue Interrupt => exception
      logger.error "Interrupt in subscribe block"
      logger.warn "Stopped gracefully."
    ensure
      if success
        ack_message(delivery_info.delivery_tag)
      else
        reject_message(delivery_info.delivery_tag)
      end

      clear_active_record_connections
    end
  end

  self.class.sleep_forever if @forever
ensure
  logger.info "Closing Channel"
  channel.close
end
create_queue() click to toggle source
# File lib/watership/consumer.rb, line 67
def create_queue
  channel.queue(@consumer::QUEUE, @channel_opts)
end
enqueue(message) click to toggle source
# File lib/watership/consumer.rb, line 83
def enqueue(message)
  create_queue.publish(JSON.generate(message))
end
logger() click to toggle source
# File lib/watership/consumer.rb, line 92
def logger
  @logger ||= defined?(Rails) ? Rails.logger : Logger.new(STDOUT)
end
notify(exception, data) click to toggle source
# File lib/watership/consumer.rb, line 87
def notify(exception, data)
  Airbrake.notify(exception) if defined?(Airbrake)
  Bugsnag.notify(exception, data: data) if defined?(Bugsnag)
end
reject_message(tag, requeue = true) click to toggle source
# File lib/watership/consumer.rb, line 62
def reject_message(tag, requeue = true)
  logger.info "Rejecting message"
  channel.reject(tag, requeue)
end