class Lapine::Consumer::Runner
Attributes
argv[R]
Public Class Methods
new(argv)
click to toggle source
# File lib/lapine/consumer/runner.rb, line 18 def initialize(argv) @argv = argv @message_count = 0 @running_message_count = 0 end
Public Instance Methods
config()
click to toggle source
# File lib/lapine/consumer/runner.rb, line 73 def config @config ||= Lapine::Consumer::Config.new.load(argv) end
handle_signals!()
click to toggle source
# File lib/lapine/consumer/runner.rb, line 96 def handle_signals! $STOP_LAPINE_CONSUMER = false Signal.trap('INT') { EventMachine.stop } Signal.trap('TERM') { $STOP_LAPINE_CONSUMER = true } end
logger()
click to toggle source
# File lib/lapine/consumer/runner.rb, line 81 def logger @logger ||= config.logfile ? ::Lapine::AnnotatedLogger.new(config.logfile) : ::Lapine::AnnotatedLogger.new(STDOUT) end
queue_properties()
click to toggle source
# File lib/lapine/consumer/runner.rb, line 85 def queue_properties {}.tap do |props| props.merge!(auto_delete: true) if config.transient? props.merge!(durable: true) unless config.transient? end end
run()
click to toggle source
# File lib/lapine/consumer/runner.rb, line 24 def run handle_signals! Environmenter::Loader.new(config).load! logger.info 'starting Lapine::Consumer' @queue_properties = queue_properties EventMachine.run do topology.each_binding do |q, conn, routing_key, handlers| queue = conn.channel.queue(q, @queue_properties).bind(conn.exchange, routing_key: routing_key) queue.subscribe(ack: true) do |metadata, payload| process(metadata, payload, handlers) EventMachine.stop_event_loop if should_exit? end queues << queue end topology.each_queue_to_delete do |q, conn, routing_key, handlers| # if queue does not exist in RabbitMQ, skip processing # else queue = conn.channel.queue(q, @queue_properties) queues_to_delete << queue queue.subscribe(ack: true) do |metadata, payload| process(metadata, payload, handlers) end EventMachine.add_timer(0.5) do logger.info "Lapine::Consumer unbinding #{queue.name} from exchange: #{conn.exchange.name}, routing_key: #{routing_key}" queue.unbind(conn.exchange, routing_key: routing_key) end end if config.debug? EventMachine.add_periodic_timer(10) do logger.info "Lapine::Consumer messages processed=#{@message_count} running_count=#{@running_message_count}" @message_count = 0 end end EventMachine.add_periodic_timer(5) do EventMachine.stop_event_loop if should_exit? end schedule_queue_deletion end logger.warn 'exiting Lapine::Consumer' end
should_exit?()
click to toggle source
# File lib/lapine/consumer/runner.rb, line 92 def should_exit? $STOP_LAPINE_CONSUMER end
topology()
click to toggle source
# File lib/lapine/consumer/runner.rb, line 77 def topology @topology ||= ::Lapine::Consumer::Topology.new(config, logger) end
Private Instance Methods
process(metadata, payload, handlers)
click to toggle source
# File lib/lapine/consumer/runner.rb, line 136 def process(metadata, payload, handlers) message = Consumer::Message.new(payload, metadata, logger) Middleware.app.call(message) do |message| handlers.each do |handler| Lapine::Consumer::Dispatcher.new(handler, message).dispatch end if config.debug? @message_count += 1 @running_message_count += 1 end end end
queues()
click to toggle source
# File lib/lapine/consumer/runner.rb, line 104 def queues @queues ||= [] end
queues_to_delete()
click to toggle source
# File lib/lapine/consumer/runner.rb, line 108 def queues_to_delete @queues_to_delete ||= [] end
schedule_queue_deletion()
click to toggle source
# File lib/lapine/consumer/runner.rb, line 112 def schedule_queue_deletion EventMachine.add_timer(30) do queues_to_delete.each do |queue| logger.info "Lapine::Consumer checking #{queue.name} for deletion" begin queue.status do |message_count, consumer_count| if message_count == 0 logger.info "Lapine::Consumer deleting #{queue.name}" queue.unsubscribe queue.delete unless config.transient? queues_to_delete.delete(queue) else logger.info "Lapine::Consumer skipping #{queue.name} deletion, message count: #{message_count}" schedule_queue_deletion end end rescue => e logger.error "Unable to delete queue #{queue.name}, error: #{e.message}" end end end end