class Cuniculus::Consumer
Constants
- JOB_REQUIRED_KEYS
- POLL_TIME
Attributes
channel[R]
exchange[R]
job_queue[R]
queue_config[R]
Public Class Methods
new(queue_config, channel)
click to toggle source
# File lib/cuniculus/consumer.rb 13 def initialize(queue_config, channel) 14 @channel = channel 15 @queue_config = queue_config 16 end
Public Instance Methods
constantize(str)
click to toggle source
# File lib/cuniculus/consumer.rb 80 def constantize(str) 81 return Object.const_get(str) unless str.include?("::") 82 83 names = str.split("::") 84 names.shift if names.empty? || names.first.empty? 85 86 names.inject(Object) do |constant, name| 87 constant.const_get(name, false) 88 end 89 end
handle_error(e)
click to toggle source
# File lib/cuniculus/consumer.rb 75 def handle_error(e) 76 Cuniculus.logger.error("#{e.class.name}: #{e.message}") 77 Cuniculus.logger.error(e.backtrace.join("\n")) unless e.backtrace.nil? 78 end
maybe_retry(delivery_info, item)
click to toggle source
# File lib/cuniculus/consumer.rb 57 def maybe_retry(delivery_info, item) 58 retry_count = item["_cun_retries"].to_i 59 retry_queue_name = job_queue.retry_queue(retry_count) 60 unless retry_queue_name 61 channel.nack(delivery_info.delivery_tag, false, false) 62 return 63 end 64 payload = Cuniculus.dump_job(item.merge("_cun_retries" => retry_count + 1)) 65 exchange.publish( 66 payload, 67 { 68 routing_key: retry_queue_name, 69 persistent: true 70 } 71 ) 72 channel.ack(delivery_info.delivery_tag, false) 73 end
parse_job(payload)
click to toggle source
# File lib/cuniculus/consumer.rb 46 def parse_job(payload) 47 msg = Cuniculus.load_job(payload) 48 raise Cuniculus::BadlyFormattedPayload, "Consumed message with missing information: #{payload}\nIt should have keys [#{JOB_REQUIRED_KEYS.join(', ')}]" unless (JOB_REQUIRED_KEYS - msg.keys).empty? 49 50 msg 51 rescue Cuniculus::BadlyFormattedPayload 52 raise 53 rescue StandardError => ex 54 raise Cuniculus.convert_exception_class(ex, Cuniculus::BadlyFormattedPayload), "Badly formatted consumed message: #{payload}" 55 end
run_job(delivery_info, _properties, payload)
click to toggle source
# File lib/cuniculus/consumer.rb 31 def run_job(delivery_info, _properties, payload) 32 item = parse_job(payload) 33 klass = Object.const_get(item["class"]) 34 worker = klass.new 35 worker.perform(*item["args"]) 36 channel.ack(delivery_info.delivery_tag, false) 37 rescue Cuniculus::BadlyFormattedPayload => ex 38 handle_error(ex) 39 # If parse failed, send message straight to DLX 40 channel.nack(delivery_info.delivery_tag, false, false) 41 rescue StandardError => ex 42 handle_error(Cuniculus.convert_exception_class(ex, Cuniculus::Error)) 43 maybe_retry(delivery_info, item) 44 end
start()
click to toggle source
# File lib/cuniculus/consumer.rb 18 def start 19 @exchange = channel.direct(Cuniculus::CUNICULUS_EXCHANGE, { durable: true }) 20 @job_queue = queue_config.declare!(channel) 21 @_consumer = job_queue.subscribe(manual_ack: true, block: false) do |delivery_info, properties, payload| 22 run_job(delivery_info, properties, payload) 23 end 24 end
stop()
click to toggle source
# File lib/cuniculus/consumer.rb 26 def stop 27 @_consumer&.cancel 28 channel.close unless channel.closed? 29 end