module Evrone::Common::AMQP::Consumer::Subscribe

Public Instance Methods

subscribe() click to toggle source
# File lib/evrone/common/amqp/consumer/subscribe.rb, line 6
def subscribe
  session.open

  session.with_channel do
    x = declare_exchange
    q = declare_queue

    run_callbacks(:subscribe, exchange: x, queue: q, name: consumer_id) do
      debug "subscribing to #{q.name}:#{x.name} using #{bind_options.inspect}"
      q.bind(x, bind_options)
      debug "successfuly subscribed to #{q.name}:#{x.name}"

      subscription_loop q
    end

    debug "shutdown"
  end
end

Private Instance Methods

deserialize_message(properties, payload) click to toggle source
# File lib/evrone/common/amqp/consumer/subscribe.rb, line 58
def deserialize_message(properties, payload)
  Common::AMQP::Formatter.unpack properties[:content_type],
                                 model,
                                 payload
end
run_instance(delivery_info, properties, payload) click to toggle source
# File lib/evrone/common/amqp/consumer/subscribe.rb, line 47
def run_instance(delivery_info, properties, payload)
  payload = deserialize_message properties, payload

  run_callbacks :recieve, payload: payload, name: consumer_id do
    new.tap do |inst|
      inst.properties    = properties
      inst.delivery_info = delivery_info
    end.perform payload
  end
end
subscription_loop(q) click to toggle source
# File lib/evrone/common/amqp/consumer/subscribe.rb, line 27
def subscription_loop(q)
  loop do
    break if shutdown?

    delivery_info, properties, payload = q.pop(ack: ack)

    if payload
      result = nil

      debug "recieve ##{delivery_info.delivery_tag.to_i} #{payload.inspect}"
      result = run_instance delivery_info, properties, payload
      debug "done ##{delivery_info.delivery_tag.to_i}"

      break if result == :shutdown
    else
      sleep config.pool_timeout
    end
  end
end