class Ocular::Inputs::RabbitMQ::Input::DSLProxy

Public Class Methods

new(proxy, handler, logger) click to toggle source
# File lib/ocular/inputs/rabbitmq_input.rb, line 56
def initialize(proxy, handler, logger)
    @proxy = proxy
    @handler = handler
    @logger = logger
end

Public Instance Methods

subscribe(queue, *settings, &block) click to toggle source
# File lib/ocular/inputs/rabbitmq_input.rb, line 62
def subscribe(queue, *settings, &block)
    eventbase = Ocular::DSL::EventBase.new(@proxy, &block)
    ::Ocular.logger.debug "rabbitmq.subscribe to# #{queue} for block #{block}"

    ch = @handler.conn.create_channel
    q  = ch.queue(queue, *settings)

    q.subscribe(:manual_ack => true) do |delivery_info, metadata, payload|
        context = RabbitMQRunContext.new(@logger)
        context.after_fork()
        context.log_cause("rabbitmq.subscribe(#{queue})", {:delivery_info => delivery_info, :metadata => metadata, :payload => payload})
        context.delivery_info = delivery_info
        context.metadata = metadata
        context.payload = payload
        begin
            eventbase.exec(context)
            ch.acknowledge(delivery_info.delivery_tag, false)
        rescue StandardError => e
            sleep 1
            warn "Error on RabbitMQ event processing on context #{context}. Error: #{e}"
            ch.reject(delivery_info.delivery_tag, true)
        end
    end

    id = queue + "-" + block.to_s

    @proxy.events[id] = eventbase

    return id
end