class Emque::Consuming::Adapters::RabbitMq::DelayedMessageWorker
Attributes
channel[RW]
delayed_message_exchange[RW]
queue[RW]
Public Class Methods
new(connection)
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb, line 18 def initialize(connection) self.channel = connection.create_channel if config.adapter.options[:prefetch] channel.prefetch(config.adapter.options[:prefetch]) end self.delayed_message_exchange = channel.exchange( "emque.#{config.app_name}.delayed_message", { :type => "x-delayed-message", :durable => true, :auto_delete => false, :arguments => { "x-delayed-type" => "direct", } } ) self.queue = channel.queue( "emque.#{config.app_name}.delayed_message", :durable => config.adapter.options[:durable], :auto_delete => config.adapter.options[:auto_delete], :arguments => { "x-dead-letter-exchange" => "#{config.app_name}.error" } ).bind(delayed_message_exchange) end
Public Instance Methods
actor_died(actor, reason)
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb, line 12 def actor_died(actor, reason) unless shutdown logger.error "#{log_prefix} actor_died - died: #{reason}" end end
start()
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb, line 47 def start logger.info "#{log_prefix} starting..." queue.subscribe(:manual_ack => true, &method(:process_message)) logger.debug "#{log_prefix} started" end
stop()
click to toggle source
Calls superclass method
Emque::Consuming::Actor#stop
# File lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb, line 53 def stop logger.debug "#{log_prefix} stopping..." super do logger.debug "#{log_prefix} closing channel" channel.close end logger.debug "#{log_prefix} stopped" end
Private Instance Methods
log_prefix()
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb, line 84 def log_prefix "RabbitMQ DelayedMessageWorker:" end
process_message(delivery_info, metadata, payload)
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/delayed_message_worker.rb, line 66 def process_message(delivery_info, metadata, payload) begin logger.info "#{log_prefix} processing message #{metadata}" logger.debug "#{log_prefix} payload #{payload}" message = Emque::Consuming::Message.new( :original => payload ) ::Emque::Consuming::Consumer.new.consume(:process, message) channel.ack(delivery_info.delivery_tag) rescue StandardError => exception if retryable_errors.any? { |error| exception.class.to_s =~ /#{error}/ } retry_error(delivery_info, metadata, payload, exception) else channel.nack(delivery_info.delivery_tag) end end end