class OmfCommon::Comm::AMQP::Topic
Public Class Methods
new(id, opts = {})
click to toggle source
Calls superclass method
OmfCommon::Comm::Topic::new
# File lib/omf_common/comm/amqp/amqp_topic.rb, line 48 def initialize(id, opts = {}) unless @communicator = opts.delete(:communicator) raise "Missing :communicator option" end super @address = opts[:address] @lock = Monitor.new @subscribed = false @on_subscribed_handlers = [] # Monitor o.op & o.info by default @routing_key = opts[:routing_key] || "o.*" _init_amqp end
Public Instance Methods
address()
click to toggle source
# File lib/omf_common/comm/amqp/amqp_topic.rb, line 17 def address @address end
on_subscribed(&block)
click to toggle source
Call ‘block’ when topic is subscribed to underlying messaging infrastructure.
# File lib/omf_common/comm/amqp/amqp_topic.rb, line 24 def on_subscribed(&block) return unless block call_now = false @lock.synchronize do if @subscribed call_now = true else @on_subscribed_handlers << block end end if call_now after(0, &block) end end
to_s()
click to toggle source
# File lib/omf_common/comm/amqp/amqp_topic.rb, line 13 def to_s @address end
unsubscribe(key)
click to toggle source
Calls superclass method
OmfCommon::Comm::Topic#unsubscribe
# File lib/omf_common/comm/amqp/amqp_topic.rb, line 40 def unsubscribe(key) super #@exchange.delete end
Private Instance Methods
_init_amqp()
click to toggle source
# File lib/omf_common/comm/amqp/amqp_topic.rb, line 63 def _init_amqp() channel = @communicator.channel @exchange = channel.topic(id, :auto_delete => true) channel.queue("", :exclusive => true, :auto_delete => true) do |queue| queue.bind(@exchange, routing_key: @routing_key) queue.subscribe do |headers, payload| debug "Received message on #{@address} | #{@routing_key}" MPReceived.inject(Time.now.to_f, @address, payload.to_s[/mid\":\"(.{36})/, 1]) if OmfCommon::Measure.enabled? Message.parse(payload, headers.content_type) do |msg| on_incoming_message(msg) end end debug "Subscribed to '#@id'" # Call all accumulated on_subscribed handlers @lock.synchronize do @subscribed = true @on_subscribed_handlers.each do |block| after(0, &block) end @on_subscribed_handlers = nil end end end
_send_message(msg, opts = {}, block = nil)
click to toggle source
Calls superclass method
OmfCommon::Comm::Topic#_send_message
# File lib/omf_common/comm/amqp/amqp_topic.rb, line 88 def _send_message(msg, opts = {}, block = nil) super content_type, content = msg.marshall(self) debug "(#{id}) Send message (#{content_type}) #{msg.inspect} TO #{opts[:routing_key]}" if @exchange @exchange.publish(content, content_type: content_type, message_id: msg.mid, routing_key: opts[:routing_key]) MPPublished.inject(Time.now.to_f, @address, msg.mid) if OmfCommon::Measure.enabled? else warn "Unavailable AMQP channel. Dropping message '#{msg}'" end end