class RabbitWQ::MessageHandler
Constants
- REQUEUE
Public Instance Methods
call( options )
click to toggle source
# File lib/rabbit_wq/message_handler.rb, line 16 def call( options ) Time.zone = config.time_zone channel = options[:channel] delivery_info = options[:delivery_info] metadata = options[:metadata] payload = options[:payload] worker = deserialize_worker( payload ) info Rainbow( "WORKER [#{worker.object_id}] " + worker.inspect ).yellow handle_work( worker, payload ) try_on_success_callback( worker ) channel.ack delivery_info.delivery_tag rescue => e handle_error( worker, e, channel, delivery_info, payload, metadata ) end
Protected Instance Methods
deserialize_worker( payload )
click to toggle source
# File lib/rabbit_wq/message_handler.rb, line 35 def deserialize_worker( payload ) YAML::load( payload ).tap do |worker| unless worker.is_a?( RabbitWQ::Worker ) raise ArgumentError, "Worker of type #{worker.class.name} is not a valid worker (not a descendent of #{RabbitWQ::Worker.name})" end end rescue => e raise RabbitWQ::InvalidWorkError, "#{e.message} -- #{e.class.name}", e.backtrace end
handle_error( worker, e, channel, delivery_info, payload, metadata )
click to toggle source
# File lib/rabbit_wq/message_handler.rb, line 70 def handle_error( worker, e, channel, delivery_info, payload, metadata ) headers = metadata[:headers] if metadata error_metadata = { type: e.class.name, message: e.message, backtrace: e.backtrace } if headers && headers['retry'] && !e.is_a?( RabbitWQ::FinalError ) attempt = headers.fetch( 'attempt', 1 ).to_i if attempt < headers['retry'] retry_delay = headers.fetch( 'retry_delay', 30000 ) if retry_delay == 'auto-scale' retry_delay = retry_delays( attempt ) end Work.enqueue_payload( payload, headers.merge( delay: retry_delay, attempt: attempt + 1 ). merge( error: error_metadata )) error( e ) worker_error( worker, "ERROR WITH RETRY " + error_metadata.inspect ) try_on_error_callback( worker, e ) try_on_retryable_error_callback( worker, e ) channel.nack delivery_info.delivery_tag return end end if e.is_a?( RabbitWQ::FinalError ) && e.level != :error RabbitWQ.work_logger.send( e.level, worker, e.message ) try_on_error_callback( worker, e ) try_on_final_error_callback( worker, e ) channel.nack delivery_info.delivery_tag return end Work.enqueue_error_payload( payload, error: error_metadata ) error( e ) worker_error( worker, "FINAL ERROR " + error_metadata.inspect ) try_on_error_callback( worker, e ) try_on_final_error_callback( worker, e ) channel.nack delivery_info.delivery_tag end
handle_work( worker, payload )
click to toggle source
# File lib/rabbit_wq/message_handler.rb, line 47 def handle_work( worker, payload ) if ignore_and_trash?( worker ) info "Trashed: #{worker.class.name}:#{worker.object_id}" return end if ignore_and_error?( worker ) info "Ignored and sent to error queue: #{worker.class.name}:#{worker.object_id}" Work.enqueue_error_payload( payload, error: "Worker ignored" ) return end unless worker.enabled? worker_info( worker, "Worker disabled" ) if worker.error_on_disabled? Work.enqueue_error_payload( payload, error: "Worker disabled" ) end return end worker.call end
host_namespace()
click to toggle source
# File lib/rabbit_wq/message_handler.rb, line 147 def host_namespace RabbitWQ end
ignore_and_error?( worker )
click to toggle source
# File lib/rabbit_wq/message_handler.rb, line 143 def ignore_and_error?( worker ) config.ignored_workers_to_error_queue.include?( worker.class.name ) end
ignore_and_trash?( worker )
click to toggle source
# File lib/rabbit_wq/message_handler.rb, line 139 def ignore_and_trash?( worker ) config.ignored_workers_trash.include?( worker.class.name ) end
requeue( channel, delivery_info, e=nil )
click to toggle source
# File lib/rabbit_wq/message_handler.rb, line 134 def requeue( channel, delivery_info, e=nil ) info Rainbow( 'REQUEUE ' + e.message ).yellow channel.reject delivery_info.delivery_tag, REQUEUE end
retry_delays( retry_num )
click to toggle source
# File lib/rabbit_wq/message_handler.rb, line 151 def retry_delays( retry_num ) { 1 => 1, 2 => 5, 3 => 15, 4 => 30, 5 => 60, 6 => 360, # 6 hrs 7 => 720, # 12 hrs 8 => 1440, # 24 hrs 9 => 2880, # 48 hrs }[retry_num] * 60000 end
try_on_error_callback( worker, e )
click to toggle source
# File lib/rabbit_wq/message_handler.rb, line 119 def try_on_error_callback( worker, e ) return unless worker.respond_to?( :on_error, true ) worker.send( :on_error, e ) end
try_on_final_error_callback( worker, e )
click to toggle source
# File lib/rabbit_wq/message_handler.rb, line 129 def try_on_final_error_callback( worker, e ) return unless worker.respond_to?( :on_final_error, true ) worker.send( :on_final_error, e ) end
try_on_retryable_error_callback( worker, e )
click to toggle source
# File lib/rabbit_wq/message_handler.rb, line 124 def try_on_retryable_error_callback( worker, e ) return unless worker.respond_to?( :on_retryable_error, true ) worker.send( :on_retryable_error, e ) end
try_on_success_callback( worker )
click to toggle source
# File lib/rabbit_wq/message_handler.rb, line 114 def try_on_success_callback( worker ) return unless worker.respond_to?( :on_success, true ) worker.send( :on_success ) end