class Emque::Consuming::Adapters::RabbitMq::Worker
Attributes
channel[RW]
name[RW]
queue[RW]
topic[RW]
Public Class Methods
new(connection, topic)
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/worker.rb, line 20 def initialize(connection, topic) self.topic = topic self.name = "#{self.topic} worker" self.shutdown = false # @note: channels are not thread safe, so is better to use # a new channel in each worker. # https://github.com/jhbabon/amqp-celluloid/blob/master/lib/consumer.rb self.channel = connection.create_channel if config.adapter.options[:prefetch] channel.prefetch(config.adapter.options[:prefetch]) end self.queue = channel .queue( "emque.#{config.app_name}.#{topic}", :durable => config.adapter.options[:durable], :auto_delete => config.adapter.options[:auto_delete], :arguments => { "x-dead-letter-exchange" => "#{config.app_name}.error" } ) .bind( channel.fanout( topic.to_s, :durable => true, :auto_delete => false, ) ) end
Public Instance Methods
actor_died(actor, reason)
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/worker.rb, line 14 def actor_died(actor, reason) unless shutdown logger.error "#{log_prefix} actor_died - died: #{reason}" end end
start()
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/worker.rb, line 52 def start logger.info "#{log_prefix} starting..." queue.subscribe(:manual_ack => true, &method(:process_message)) logger.debug "#{log_prefix} started" end
stop()
click to toggle source
Calls superclass method
Emque::Consuming::Actor#stop
# File lib/emque/consuming/adapters/rabbit_mq/worker.rb, line 58 def stop logger.debug "#{log_prefix} stopping..." super do logger.debug "#{log_prefix} closing channel" channel.close end logger.debug "#{log_prefix} stopped" end
Private Instance Methods
delayed_message_exchange()
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/worker.rb, line 104 def delayed_message_exchange delayed_message_exchange = channel.exchange( "emque.#{config.app_name}.delayed_message", { :type => "x-delayed-message", :durable => true, :auto_delete => false, :arguments => { "x-delayed-type" => "direct", } } ) channel.queue( "emque.#{config.app_name}.delayed_message", :durable => config.adapter.options[:durable], :auto_delete => config.adapter.options[:auto_delete], :arguments => { "x-dead-letter-exchange" => "#{config.app_name}.error" } ).bind(delayed_message_exchange) delayed_message_exchange end
enable_delayed_message()
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/worker.rb, line 71 def enable_delayed_message config.enable_delayed_message end
log_prefix()
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/worker.rb, line 128 def log_prefix "RabbitMQ Worker: #{object_id} #{name}" end
process_message(delivery_info, metadata, payload)
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/worker.rb, line 75 def process_message(delivery_info, metadata, payload) begin logger.info "#{log_prefix} processing message #{metadata}" logger.debug "#{log_prefix} payload #{payload}" message = Emque::Consuming::Message.new( :original => payload ) ::Emque::Consuming::Consumer.new.consume(:process, message) channel.ack(delivery_info.delivery_tag) rescue StandardError => exception if enable_delayed_message begin publish_to_delayed_message(delivery_info, metadata, payload) rescue channel.nack(delivery_info.delivery_tag) end else channel.nack(delivery_info.delivery_tag) end end end
publish_to_delayed_message(delivery_info, metadata, payload)
click to toggle source
# File lib/emque/consuming/adapters/rabbit_mq/worker.rb, line 97 def publish_to_delayed_message(delivery_info, metadata, payload) headers = metadata[:headers] || {} headers["x-delay"] = 1000 delayed_message_exchange.publish(payload, { :headers => headers }) channel.ack(delivery_info.delivery_tag) end