class Watership::Consumer
Public Class Methods
new(consumer, url, channel_options = {}, queue_options = {})
click to toggle source
# File lib/watership/consumer.rb, line 7 def initialize(consumer, url, channel_options = {}, queue_options = {}) @consumer = consumer @url = url @prefetch = channel_options.delete(:prefetch) || Integer(ENV.fetch("RABBIT_CONSUMER_PREFETCH", 200)) @concurrency = channel_options.delete(:concurrency) || 1 @forever = channel_options.delete(:forever) || true @channel_opts = { durable: true }.merge(channel_options) @queue_opts = { block: false, manual_ack: true }.merge(queue_options) end
sleep_forever()
click to toggle source
# File lib/watership/consumer.rb, line 102 def self.sleep_forever sleepy_thread = Thread.new { sleep } Signal.trap("TERM") do sleepy_thread.terminate end sleepy_thread.join rescue Interrupt end
Public Instance Methods
ack_message(tag)
click to toggle source
# File lib/watership/consumer.rb, line 57 def ack_message(tag) logger.info "Acking message" channel.acknowledge(tag, false) end
bind(name, opts = {})
click to toggle source
# File lib/watership/consumer.rb, line 53 def bind(name, opts = {}) create_queue.bind(name, opts) end
channel()
click to toggle source
# File lib/watership/consumer.rb, line 75 def channel @channel ||= begin created_channel = connection.create_channel(nil, @concurrency) created_channel.prefetch(@prefetch) created_channel end end
clear_active_record_connections()
click to toggle source
# File lib/watership/consumer.rb, line 96 def clear_active_record_connections if defined?(::ActiveRecord::Base) && ::ActiveRecord::Base.respond_to?(:clear_active_connections!) ::ActiveRecord::Base.clear_active_connections! end end
connection()
click to toggle source
# File lib/watership/consumer.rb, line 71 def connection @connection ||= Bunny.new(@url).tap { |bunny| bunny.start } end
consume()
click to toggle source
# File lib/watership/consumer.rb, line 17 def consume queue = create_queue queue.subscribe(@queue_opts.dup) do |delivery_info, properties, payload| begin data = JSON.parse(payload) @consumer.new.call(data) success = true rescue StandardError => exception logger.error "Error thrown in subscribe block" logger.error exception.message logger.error exception.backtrace.join("\n") retries = data["retries"] || 0 notify(exception, { payload: data, retries: retries }) enqueue(data.merge(retries: (retries + 1))) success = true rescue Interrupt => exception logger.error "Interrupt in subscribe block" logger.warn "Stopped gracefully." ensure if success ack_message(delivery_info.delivery_tag) else reject_message(delivery_info.delivery_tag) end clear_active_record_connections end end self.class.sleep_forever if @forever ensure logger.info "Closing Channel" channel.close end
create_queue()
click to toggle source
# File lib/watership/consumer.rb, line 67 def create_queue channel.queue(@consumer::QUEUE, @channel_opts) end
enqueue(message)
click to toggle source
# File lib/watership/consumer.rb, line 83 def enqueue(message) create_queue.publish(JSON.generate(message)) end
logger()
click to toggle source
# File lib/watership/consumer.rb, line 92 def logger @logger ||= defined?(Rails) ? Rails.logger : Logger.new(STDOUT) end
notify(exception, data)
click to toggle source
# File lib/watership/consumer.rb, line 87 def notify(exception, data) Airbrake.notify(exception) if defined?(Airbrake) Bugsnag.notify(exception, data: data) if defined?(Bugsnag) end
reject_message(tag, requeue = true)
click to toggle source
# File lib/watership/consumer.rb, line 62 def reject_message(tag, requeue = true) logger.info "Rejecting message" channel.reject(tag, requeue) end