class Ocular::Inputs::RabbitMQ::Input::DSLProxy
Public Class Methods
new(proxy, handler, logger)
click to toggle source
# File lib/ocular/inputs/rabbitmq_input.rb, line 56 def initialize(proxy, handler, logger) @proxy = proxy @handler = handler @logger = logger end
Public Instance Methods
subscribe(queue, *settings, &block)
click to toggle source
# File lib/ocular/inputs/rabbitmq_input.rb, line 62 def subscribe(queue, *settings, &block) eventbase = Ocular::DSL::EventBase.new(@proxy, &block) ::Ocular.logger.debug "rabbitmq.subscribe to# #{queue} for block #{block}" ch = @handler.conn.create_channel q = ch.queue(queue, *settings) q.subscribe(:manual_ack => true) do |delivery_info, metadata, payload| context = RabbitMQRunContext.new(@logger) context.after_fork() context.log_cause("rabbitmq.subscribe(#{queue})", {:delivery_info => delivery_info, :metadata => metadata, :payload => payload}) context.delivery_info = delivery_info context.metadata = metadata context.payload = payload begin eventbase.exec(context) ch.acknowledge(delivery_info.delivery_tag, false) rescue StandardError => e sleep 1 warn "Error on RabbitMQ event processing on context #{context}. Error: #{e}" ch.reject(delivery_info.delivery_tag, true) end end id = queue + "-" + block.to_s @proxy.events[id] = eventbase return id end