class ActiveMessage::Worker

Public Instance Methods

initializer() click to toggle source
# File lib/active_message/worker.rb, line 3
def initializer
  ActiveMessage::Logger.debug "Initialized ActiveMessage Worker ...."
end
run() click to toggle source
# File lib/active_message/worker.rb, line 7
def run
  ActiveMessage::Logger.debug "ActiveMessage Worker Started ...."
  setup_bindings
  threads.each(&:join)
  ActiveMessage::Logger.debug "ActiveMessage Worker Stopped ...."
end
threads() click to toggle source
# File lib/active_message/worker.rb, line 14
def threads
  @threads ||= []
end

Private Instance Methods

handle_message(consumer, channel, delivery_info, properties, payload) click to toggle source
# File lib/active_message/worker.rb, line 20
def handle_message(consumer, channel, delivery_info, properties, payload)
  begin
    ActiveMessage::Logger.debug ".... Received New Message ...."
    ActiveMessage::Logger.debug "#{consumer.name}"
    ActiveMessage::Logger.debug "#{properties.inspect}"
    ActiveMessage::Logger.debug "#{payload.inspect}"
    channel.ack(delivery_info.delivery_tag, false)
    consumer.new.process(delivery_info, properties, payload)
  rescue StandardError => ex
    #handle_error(properties.message_id, consumer, ex)
    channel.nack(delivery_info.delivery_tag, false, false)
  end
end
setup_bindings() click to toggle source
# File lib/active_message/worker.rb, line 34
def setup_bindings
  if ActiveMessage.consumers.size <= 0
    ActiveMessage::Logger.fatal "No ActiveRecord Consumers Loaded ...."
    exit
  end
  ActiveMessage.consumers.each do |consumer|
    thread = Thread.new do
      channel = ActiveMessage.broker.connection.create_channel
      Thread.current.thread_variable_set(:consumer, consumer)
      Thread.current.thread_variable_set(:channel,  channel)
      channel = Thread.current.thread_variable_get(:channel)
      consumer = Thread.current.thread_variable_get(:consumer)
      channel.queue(consumer.get_queue_name, durable: true).bind(ActiveMessage.broker.exchange, routing_key: consumer.routing_keys.first).subscribe(ack: true) do |delivery_info, properties, payload|
        handle_message(consumer, channel, delivery_info, properties, payload)
      end
      loop do
        sleep 5.0
      end
    end
    threads.push(thread)
  end
end