module Vx::Common::AMQP::Consumer::Subscribe

Public Instance Methods

pop(q) click to toggle source
# File lib/vx/common/amqp/consumer/subscribe.rb, line 39
def pop(q)
  unpacked = nil
  delivery_info, properties, payload = q.pop(ack: ack)

  if payload
    unpacked = deserialize_message properties, payload
  end

  [unpacked, delivery_info, properties]
end
start() { |x, q| ... } click to toggle source
# File lib/vx/common/amqp/consumer/subscribe.rb, line 14
def start
  rs = nil
  instrumentation = {
    consumer_id: consumer_id,
    consumer:    consumer_name
  }
  session.open instrumentation
  session.with_channel do
    x = declare_exchange
    q = declare_queue

    instrumentation.merge!(
      exchange:    x.name,
      queue:       q.name,
    )
    instrument("start.consumer.amqp", instrumentation)

    q.bind(x, bind_options)
    rs = yield(x, q) if block_given?

    instrument("shutdown.consumer.amqp", instrumentation)
  end
  rs
end
subscribe() click to toggle source
# File lib/vx/common/amqp/consumer/subscribe.rb, line 8
def subscribe
  start do |x, q|
    subscription_loop q
  end
end

Private Instance Methods

deserialize_message(properties, payload) click to toggle source
# File lib/vx/common/amqp/consumer/subscribe.rb, line 96
def deserialize_message(properties, payload)
  Common::AMQP::Formatter.unpack(
    properties[:content_type],
    model,
    payload
  )
end
run_instance(delivery_info, properties, payload, uuid) click to toggle source
# File lib/vx/common/amqp/consumer/subscribe.rb, line 84
def run_instance(delivery_info, properties, payload, uuid)
  new.tap do |inst|
    inst.properties    = properties
    inst.delivery_info = delivery_info
    inst.request_id    = uuid
  end.perform payload
end
subscription_loop(q) click to toggle source
# File lib/vx/common/amqp/consumer/subscribe.rb, line 52
def subscription_loop(q)
  loop do
    break if shutdown?

    delivery_info, properties, payload = q.pop(ack: ack)

    if payload
      payload = deserialize_message properties, payload
      result  = nil
      uuid    = SecureRandom.uuid

      instrumentation = {
        consumer_id:   consumer_id,
        consumer:      consumer_name,
        payload:       payload,
        properties:    properties,
      }

      with_sub_middlewares instrumentation do
        instrument("start_processing.consumer.amqp", instrumentation)
        instrument("process.consumer.amqp", instrumentation) do
          result  = run_instance delivery_info, properties, payload, uuid
        end
      end

      break if result == :shutdown
    else
      sleep config.pool_timeout
    end
  end
end
with_sub_middlewares(env, &block) click to toggle source
# File lib/vx/common/amqp/consumer/subscribe.rb, line 92
def with_sub_middlewares(env, &block)
  Common::AMQP.config.builders[:sub].to_app(block).call(env)
end