class RabbitWQ::Server

Attributes

work_consumer[R]
work_exchange[R]

Protected Instance Methods

environment_file_path() click to toggle source
# File lib/rabbit_wq/server.rb, line 101
def environment_file_path
  config.environment_file_path
end
error_queue() click to toggle source
# File lib/rabbit_wq/server.rb, line 57
def error_queue
  channel.queue( config.error_queue, durable: true )
end
handle_error( options, e ) click to toggle source
# File lib/rabbit_wq/server.rb, line 74
def handle_error( options, e )
  delivery_info = options[:delivery_info]
  log_error( e )
  #error_queue.publish( payload, headers: { exception_message: e.message,
                                           #exception_class: e.class.name,
                                           #exception_backtrace: e.backtrace } )
  debug( Rainbow( "NACK" ).red + " #{e.message}" )
  channel.nack( delivery_info.delivery_tag )
rescue => ex
  error( Rainbow( "ERROR while handling error | #{ex.class.name} | #{ex.message} | #{ex.backtrace.inspect}" ).red )
end
handler_class() click to toggle source
# File lib/rabbit_wq/server.rb, line 25
def handler_class
  RabbitWQ::MessageHandler
end
host_namespace() click to toggle source
# File lib/rabbit_wq/server.rb, line 29
def host_namespace
  RabbitWQ
end
initialize_work_logger() click to toggle source
# File lib/rabbit_wq/server.rb, line 86
def initialize_work_logger
  RabbitWQ.work_logger = WorkLogger.new( config.work_log_level,
                                         config.work_log_path )
end
load_environment() click to toggle source
# File lib/rabbit_wq/server.rb, line 91
def load_environment
  unless environment_file_path &&
    File.exists?( environment_file_path )
    raise "Environment file '#{environment_file_path}' does not exist"
  end

  ENV['RAILS_ENV'] = ENV['RACK_ENV'] = config.env
  require environment_file_path
end
log_error( e ) click to toggle source
# File lib/rabbit_wq/server.rb, line 69
def log_error( e )
  parts = [Rainbow( [e.class.name, e.message].join( ': ' ) ).red, format_backtrace( e.backtrace )]
  error( parts.join( "\n" ))
end
run() click to toggle source
# File lib/rabbit_wq/server.rb, line 33
def run
  @work_consumer = work_subscribe_queue.subscribe( manual_ack: true ) do |delivery_info, metadata, payload|
    with_supervision( delivery_info: delivery_info ) do
      debug( "#{Rainbow( "LISTENER RECEIVED " ).magenta} #{payload}" )

      call_handler_respecting_thread_count( payload: payload,
                                            delivery_info: delivery_info,
                                            metadata: metadata,
                                            channel: channel )
    end
  end
end
warn_for_dead_actor_error() click to toggle source
# File lib/rabbit_wq/server.rb, line 65
def warn_for_dead_actor_error
  warn( Rainbow( "RETRYING due to Celluloid::DeadActorError ..." ).blue )
end
warn_for_supevision_error() click to toggle source
# File lib/rabbit_wq/server.rb, line 61
def warn_for_supevision_error
  warn( Rainbow( "RETRYING due to waiting on supervisor to restart actor ..." ).cyan )
end
work_subscribe_queue() click to toggle source
# File lib/rabbit_wq/server.rb, line 51
def work_subscribe_queue
  @work_subscribe_queue ||= channel.queue( config.work_subscribe_queue,
                                           durable: true ).
                                    bind( work_exchange )
end