class ActiveMessage::Worker
Public Instance Methods
initializer()
click to toggle source
# File lib/active_message/worker.rb, line 3 def initializer ActiveMessage::Logger.debug "Initialized ActiveMessage Worker ...." end
run()
click to toggle source
# File lib/active_message/worker.rb, line 7 def run ActiveMessage::Logger.debug "ActiveMessage Worker Started ...." setup_bindings threads.each(&:join) ActiveMessage::Logger.debug "ActiveMessage Worker Stopped ...." end
threads()
click to toggle source
# File lib/active_message/worker.rb, line 14 def threads @threads ||= [] end
Private Instance Methods
handle_message(consumer, channel, delivery_info, properties, payload)
click to toggle source
# File lib/active_message/worker.rb, line 20 def handle_message(consumer, channel, delivery_info, properties, payload) begin ActiveMessage::Logger.debug ".... Received New Message ...." ActiveMessage::Logger.debug "#{consumer.name}" ActiveMessage::Logger.debug "#{properties.inspect}" ActiveMessage::Logger.debug "#{payload.inspect}" channel.ack(delivery_info.delivery_tag, false) consumer.new.process(delivery_info, properties, payload) rescue StandardError => ex #handle_error(properties.message_id, consumer, ex) channel.nack(delivery_info.delivery_tag, false, false) end end
setup_bindings()
click to toggle source
# File lib/active_message/worker.rb, line 34 def setup_bindings if ActiveMessage.consumers.size <= 0 ActiveMessage::Logger.fatal "No ActiveRecord Consumers Loaded ...." exit end ActiveMessage.consumers.each do |consumer| thread = Thread.new do channel = ActiveMessage.broker.connection.create_channel Thread.current.thread_variable_set(:consumer, consumer) Thread.current.thread_variable_set(:channel, channel) channel = Thread.current.thread_variable_get(:channel) consumer = Thread.current.thread_variable_get(:consumer) channel.queue(consumer.get_queue_name, durable: true).bind(ActiveMessage.broker.exchange, routing_key: consumer.routing_keys.first).subscribe(ack: true) do |delivery_info, properties, payload| handle_message(consumer, channel, delivery_info, properties, payload) end loop do sleep 5.0 end end threads.push(thread) end end