class Lapine::Consumer::Runner

Attributes

argv[R]

Public Class Methods

new(argv) click to toggle source
# File lib/lapine/consumer/runner.rb, line 18
def initialize(argv)
  @argv = argv
  @message_count = 0
  @running_message_count = 0
end

Public Instance Methods

config() click to toggle source
# File lib/lapine/consumer/runner.rb, line 73
def config
  @config ||= Lapine::Consumer::Config.new.load(argv)
end
handle_signals!() click to toggle source
# File lib/lapine/consumer/runner.rb, line 96
def handle_signals!
  $STOP_LAPINE_CONSUMER = false
  Signal.trap('INT') { EventMachine.stop }
  Signal.trap('TERM') { $STOP_LAPINE_CONSUMER = true }
end
logger() click to toggle source
# File lib/lapine/consumer/runner.rb, line 81
def logger
  @logger ||= config.logfile ? ::Lapine::AnnotatedLogger.new(config.logfile) : ::Lapine::AnnotatedLogger.new(STDOUT)
end
queue_properties() click to toggle source
# File lib/lapine/consumer/runner.rb, line 85
def queue_properties
  {}.tap do |props|
    props.merge!(auto_delete: true) if config.transient?
    props.merge!(durable: true) unless config.transient?
  end
end
run() click to toggle source
# File lib/lapine/consumer/runner.rb, line 24
def run
  handle_signals!
  Environmenter::Loader.new(config).load!
  logger.info 'starting Lapine::Consumer'

  @queue_properties = queue_properties
  EventMachine.run do
    topology.each_binding do |q, conn, routing_key, handlers|
      queue = conn.channel.queue(q, @queue_properties).bind(conn.exchange, routing_key: routing_key)
      queue.subscribe(ack: true) do |metadata, payload|
        process(metadata, payload, handlers)
        EventMachine.stop_event_loop if should_exit?
      end
      queues << queue
    end

    topology.each_queue_to_delete do |q, conn, routing_key, handlers|
      # if queue does not exist in RabbitMQ, skip processing
      # else
      queue = conn.channel.queue(q, @queue_properties)
      queues_to_delete << queue

      queue.subscribe(ack: true) do |metadata, payload|
        process(metadata, payload, handlers)
      end

      EventMachine.add_timer(0.5) do
        logger.info "Lapine::Consumer unbinding #{queue.name} from exchange: #{conn.exchange.name}, routing_key: #{routing_key}"
        queue.unbind(conn.exchange, routing_key: routing_key)
      end
    end

    if config.debug?
      EventMachine.add_periodic_timer(10) do
        logger.info "Lapine::Consumer messages processed=#{@message_count} running_count=#{@running_message_count}"
        @message_count = 0
      end
    end

    EventMachine.add_periodic_timer(5) do
      EventMachine.stop_event_loop if should_exit?
    end

    schedule_queue_deletion
  end

  logger.warn 'exiting Lapine::Consumer'
end
should_exit?() click to toggle source
# File lib/lapine/consumer/runner.rb, line 92
def should_exit?
  $STOP_LAPINE_CONSUMER
end
topology() click to toggle source
# File lib/lapine/consumer/runner.rb, line 77
def topology
  @topology ||= ::Lapine::Consumer::Topology.new(config, logger)
end

Private Instance Methods

process(metadata, payload, handlers) click to toggle source
# File lib/lapine/consumer/runner.rb, line 136
def process(metadata, payload, handlers)
  message = Consumer::Message.new(payload, metadata, logger)
  Middleware.app.call(message) do |message|
    handlers.each do |handler|
      Lapine::Consumer::Dispatcher.new(handler, message).dispatch
    end

    if config.debug?
      @message_count += 1
      @running_message_count += 1
    end
  end
end
queues() click to toggle source
# File lib/lapine/consumer/runner.rb, line 104
def queues
  @queues ||= []
end
queues_to_delete() click to toggle source
# File lib/lapine/consumer/runner.rb, line 108
def queues_to_delete
  @queues_to_delete ||= []
end
schedule_queue_deletion() click to toggle source
# File lib/lapine/consumer/runner.rb, line 112
def schedule_queue_deletion
  EventMachine.add_timer(30) do
    queues_to_delete.each do |queue|
      logger.info "Lapine::Consumer checking #{queue.name} for deletion"

      begin
        queue.status do |message_count, consumer_count|
          if message_count == 0
            logger.info "Lapine::Consumer deleting #{queue.name}"
            queue.unsubscribe
            queue.delete unless config.transient?
            queues_to_delete.delete(queue)
          else
            logger.info "Lapine::Consumer skipping #{queue.name} deletion, message count: #{message_count}"
            schedule_queue_deletion
          end
        end
      rescue => e
        logger.error "Unable to delete queue #{queue.name}, error: #{e.message}"
      end
    end
  end
end