class RabbitWQ::Server
Attributes
work_consumer[R]
work_exchange[R]
Protected Instance Methods
environment_file_path()
click to toggle source
# File lib/rabbit_wq/server.rb, line 101 def environment_file_path config.environment_file_path end
error_queue()
click to toggle source
# File lib/rabbit_wq/server.rb, line 57 def error_queue channel.queue( config.error_queue, durable: true ) end
handle_error( options, e )
click to toggle source
# File lib/rabbit_wq/server.rb, line 74 def handle_error( options, e ) delivery_info = options[:delivery_info] log_error( e ) #error_queue.publish( payload, headers: { exception_message: e.message, #exception_class: e.class.name, #exception_backtrace: e.backtrace } ) debug( Rainbow( "NACK" ).red + " #{e.message}" ) channel.nack( delivery_info.delivery_tag ) rescue => ex error( Rainbow( "ERROR while handling error | #{ex.class.name} | #{ex.message} | #{ex.backtrace.inspect}" ).red ) end
handler_class()
click to toggle source
# File lib/rabbit_wq/server.rb, line 25 def handler_class RabbitWQ::MessageHandler end
host_namespace()
click to toggle source
# File lib/rabbit_wq/server.rb, line 29 def host_namespace RabbitWQ end
initialize_work_logger()
click to toggle source
# File lib/rabbit_wq/server.rb, line 86 def initialize_work_logger RabbitWQ.work_logger = WorkLogger.new( config.work_log_level, config.work_log_path ) end
load_environment()
click to toggle source
# File lib/rabbit_wq/server.rb, line 91 def load_environment unless environment_file_path && File.exists?( environment_file_path ) raise "Environment file '#{environment_file_path}' does not exist" end ENV['RAILS_ENV'] = ENV['RACK_ENV'] = config.env require environment_file_path end
log_error( e )
click to toggle source
# File lib/rabbit_wq/server.rb, line 69 def log_error( e ) parts = [Rainbow( [e.class.name, e.message].join( ': ' ) ).red, format_backtrace( e.backtrace )] error( parts.join( "\n" )) end
run()
click to toggle source
# File lib/rabbit_wq/server.rb, line 33 def run @work_consumer = work_subscribe_queue.subscribe( manual_ack: true ) do |delivery_info, metadata, payload| with_supervision( delivery_info: delivery_info ) do debug( "#{Rainbow( "LISTENER RECEIVED " ).magenta} #{payload}" ) call_handler_respecting_thread_count( payload: payload, delivery_info: delivery_info, metadata: metadata, channel: channel ) end end end
warn_for_dead_actor_error()
click to toggle source
# File lib/rabbit_wq/server.rb, line 65 def warn_for_dead_actor_error warn( Rainbow( "RETRYING due to Celluloid::DeadActorError ..." ).blue ) end
warn_for_supevision_error()
click to toggle source
# File lib/rabbit_wq/server.rb, line 61 def warn_for_supevision_error warn( Rainbow( "RETRYING due to waiting on supervisor to restart actor ..." ).cyan ) end
work_subscribe_queue()
click to toggle source
# File lib/rabbit_wq/server.rb, line 51 def work_subscribe_queue @work_subscribe_queue ||= channel.queue( config.work_subscribe_queue, durable: true ). bind( work_exchange ) end