class GorgService::Consumer::Listener
Attributes
consumer[RW]
Public Class Methods
new(env: nil, max_attempts: 48, log_routing_key: nil)
click to toggle source
# File lib/gorg_service/consumer/listener.rb, line 14 def initialize(env: nil, max_attempts: 48, log_routing_key: nil) @max_attempts=max_attempts.to_i @log_routing_key=log_routing_key @env=env end
Public Instance Methods
listen()
click to toggle source
# File lib/gorg_service/consumer/listener.rb, line 21 def listen @consumer=@env.job_queue.subscribe(:manual_ack => true) do |delivery_info, properties, body| #Log routing_key=delivery_info[:routing_key] GorgService.logger.info "Received message with routing key #{routing_key}" GorgService.logger.debug "Message properties : #{properties.to_s}" GorgService.logger.debug "Message payload : #{body.to_s[0...10000]}" #Process process_message(delivery_info, properties, body) #Acknoledge @env.ch.ack(delivery_info.delivery_tag) end end
stop()
click to toggle source
# File lib/gorg_service/consumer/listener.rb, line 37 def stop @consumer.cancel end
Protected Instance Methods
process_hardfail(e, message)
click to toggle source
# File lib/gorg_service/consumer/listener.rb, line 79 def process_hardfail(e, message) e.gorg_service_message ||= message GorgService.logger.error "HARDFAIL ERROR : #{e.message}, #{e.error_raised&&e.error_raised.inspect}" GorgService.logger.info " DISCARD MESSAGE" process_logging(e) end
process_logging(error)
click to toggle source
# File lib/gorg_service/consumer/listener.rb, line 86 def process_logging(error) message=error.to_log_message message.routing_key=@log_routing_key GorgService::Producer.new.publish_message(message) end
process_message(delivery_info, _properties, body)
click to toggle source
# File lib/gorg_service/consumer/listener.rb, line 43 def process_message(delivery_info, _properties, body) message=nil begin begin #Parse message message=Message.parse(delivery_info, _properties, body) #Process message incomming_message_error_count=message.errors.count MessageRouter.new(message) process_logging(message) if message.errors.count>incomming_message_error_count rescue SoftfailError, HardfailError raise rescue StandardError => e raise HardfailError.new("UnrescuedException", e) end rescue SoftfailError => e process_softfail(e, message) rescue HardfailError => e process_hardfail(e, message) end end
process_softfail(e, message)
click to toggle source
# File lib/gorg_service/consumer/listener.rb, line 66 def process_softfail(e, message) e.gorg_service_message ||= message GorgService.logger.error "SOFTFAIL ERROR : #{e.message}" process_logging(e) message.softfail_count+=1 if message.softfail_count.to_i >= @max_attempts GorgService.logger.info " DISCARD MESSAGE : too much soft errors (#{message.softfail_count})" process_hardfail(HardfailError.new("Too Much SoftError : This message reached the limit of softerror (max: #{@max_attempts})", gorg_service_message: message, error_name: e.error_name), message) else send_to_deferred_queue(message) end end
send_to_deferred_queue(message)
click to toggle source
# File lib/gorg_service/consumer/listener.rb, line 92 def send_to_deferred_queue(message) if @env.delayed_queue_for message.routing_key GorgService::Producer.new.publish_message(message, exchange: @env.delayed_in_exchange) # # @env.delayed_in_exchange.publish(msg.to_json, :routing_key => msg.routing_key) GorgService.logger.info "DEFER MESSAGE : message sent to #{@env.delayed_in_exchange.name} with routing key #{message.routing_key}" else raise "DelayedQueueNotFound" end end