module Vx::Common::AMQP::Consumer::Subscribe
Public Instance Methods
pop(q)
click to toggle source
# File lib/vx/common/amqp/consumer/subscribe.rb, line 39 def pop(q) unpacked = nil delivery_info, properties, payload = q.pop(ack: ack) if payload unpacked = deserialize_message properties, payload end [unpacked, delivery_info, properties] end
start() { |x, q| ... }
click to toggle source
# File lib/vx/common/amqp/consumer/subscribe.rb, line 14 def start rs = nil instrumentation = { consumer_id: consumer_id, consumer: consumer_name } session.open instrumentation session.with_channel do x = declare_exchange q = declare_queue instrumentation.merge!( exchange: x.name, queue: q.name, ) instrument("start.consumer.amqp", instrumentation) q.bind(x, bind_options) rs = yield(x, q) if block_given? instrument("shutdown.consumer.amqp", instrumentation) end rs end
subscribe()
click to toggle source
# File lib/vx/common/amqp/consumer/subscribe.rb, line 8 def subscribe start do |x, q| subscription_loop q end end
Private Instance Methods
deserialize_message(properties, payload)
click to toggle source
# File lib/vx/common/amqp/consumer/subscribe.rb, line 96 def deserialize_message(properties, payload) Common::AMQP::Formatter.unpack( properties[:content_type], model, payload ) end
run_instance(delivery_info, properties, payload, uuid)
click to toggle source
# File lib/vx/common/amqp/consumer/subscribe.rb, line 84 def run_instance(delivery_info, properties, payload, uuid) new.tap do |inst| inst.properties = properties inst.delivery_info = delivery_info inst.request_id = uuid end.perform payload end
subscription_loop(q)
click to toggle source
# File lib/vx/common/amqp/consumer/subscribe.rb, line 52 def subscription_loop(q) loop do break if shutdown? delivery_info, properties, payload = q.pop(ack: ack) if payload payload = deserialize_message properties, payload result = nil uuid = SecureRandom.uuid instrumentation = { consumer_id: consumer_id, consumer: consumer_name, payload: payload, properties: properties, } with_sub_middlewares instrumentation do instrument("start_processing.consumer.amqp", instrumentation) instrument("process.consumer.amqp", instrumentation) do result = run_instance delivery_info, properties, payload, uuid end end break if result == :shutdown else sleep config.pool_timeout end end end
with_sub_middlewares(env, &block)
click to toggle source
# File lib/vx/common/amqp/consumer/subscribe.rb, line 92 def with_sub_middlewares(env, &block) Common::AMQP.config.builders[:sub].to_app(block).call(env) end