class OmfCommon::Comm::Topic
Attributes
id[R]
routing_key[R]
Public Class Methods
[](name)
click to toggle source
# File lib/omf_common/comm/topic.rb, line 33 def self.[](name) @@name2inst[name] end
create(name, opts = {}, &block)
click to toggle source
# File lib/omf_common/comm/topic.rb, line 17 def self.create(name, opts = {}, &block) # Force string conversion as 'name' can be an ExperimentProperty name = name.to_s.to_sym @@lock.synchronize do unless @@name2inst[name] debug "New topic: #{name} | #{opts[:routing_key]}" #opts[:address] ||= address_for(name) @@name2inst[name] = self.new(name, opts, &block) else debug "Existing topic: #{name} | #{@@name2inst[name].routing_key}" block.call(@@name2inst[name]) if block end @@name2inst[name] end end
new(id, opts = {})
click to toggle source
# File lib/omf_common/comm/topic.rb, line 150 def initialize(id, opts = {}) @id = id #@address = opts[:address] @handlers = {} @lock = Monitor.new @context2cbk = {} end
Public Instance Methods
address()
click to toggle source
# File lib/omf_common/comm/topic.rb, line 137 def address raise NotImplementedError end
after(delay_sec, &block)
click to toggle source
# File lib/omf_common/comm/topic.rb, line 141 def after(delay_sec, &block) return unless block OmfCommon.eventloop.after(delay_sec) do block.arity == 1 ? block.call(self) : block.call end end
configure(props = {}, core_props = {}, &block)
click to toggle source
# File lib/omf_common/comm/topic.rb, line 48 def configure(props = {}, core_props = {}, &block) create_message_and_publish(:configure, props, core_props, block) self end
create(res_type, config_props = {}, core_props = {}, &block)
click to toggle source
Request the creation of a new resource. Returns itself
# File lib/omf_common/comm/topic.rb, line 41 def create(res_type, config_props = {}, core_props = {}, &block) config_props[:type] ||= res_type debug "Create resource of type '#{res_type}'" create_message_and_publish(:create, config_props, core_props, block) self end
create_message_and_publish(type, props = {}, core_props = {}, block = nil)
click to toggle source
Only used for create, configure and request
# File lib/omf_common/comm/topic.rb, line 77 def create_message_and_publish(type, props = {}, core_props = {}, block = nil) debug "(#{id}) create_message_and_publish '#{type}': #{props.inspect}: #{core_props.inspect}" core_props[:src] ||= OmfCommon.comm.local_address msg = OmfCommon::Message.create(type, props, core_props) publish(msg, { routing_key: "o.op" }, &block) end
error?()
click to toggle source
For detecting message publishing error, means if callback indeed yield a Topic
object, there is no publishing error, thus always false
# File lib/omf_common/comm/topic.rb, line 133 def error? false end
inform(type, props = {}, core_props = {}, &block)
click to toggle source
# File lib/omf_common/comm/topic.rb, line 59 def inform(type, props = {}, core_props = {}, &block) core_props[:src] ||= OmfCommon.comm.local_address msg = OmfCommon::Message.create(:inform, props, core_props.merge(itype: type)) publish(msg, { routing_key: "o.info" }, &block) self end
on_inform(key = nil, &message_block)
click to toggle source
# File lib/omf_common/comm/topic.rb, line 114 def on_inform(key = nil, &message_block) add_message_handler(:inform, key, &message_block) end
on_message(key = nil, &message_block)
click to toggle source
# File lib/omf_common/comm/topic.rb, line 110 def on_message(key = nil, &message_block) add_message_handler(:message, key, &message_block) end
on_subscribed(&block)
click to toggle source
# File lib/omf_common/comm/topic.rb, line 128 def on_subscribed(&block) raise NotImplementedError end
publish(msg, opts = {}, &block)
click to toggle source
# File lib/omf_common/comm/topic.rb, line 84 def publish(msg, opts = {}, &block) error "!!!" if opts[:routing_key].nil? raise "Expected message but got '#{msg.class}" unless msg.is_a?(OmfCommon::Message) _send_message(msg, opts, block) end
release(resource, core_props = {}, &block)
click to toggle source
# File lib/omf_common/comm/topic.rb, line 66 def release(resource, core_props = {}, &block) unless resource.is_a? self.class raise ArgumentError, "Expected '#{self.class}', but got '#{resource.class}'" end core_props[:src] ||= OmfCommon.comm.local_address msg = OmfCommon::Message.create(:release, {}, core_props.merge(res_id: resource.id)) publish(msg, { routing_key: "o.op" }, &block) self end
request(select = [], core_props = {}, &block)
click to toggle source
# File lib/omf_common/comm/topic.rb, line 53 def request(select = [], core_props = {}, &block) # TODO: What are the parameters to the request method really? create_message_and_publish(:request, select, core_props, block) self end
unsubscribe(key)
click to toggle source
Remove all registered callbacks for ‘key’. Will also unsubscribe from the underlying comms layer if no callbacks remain.
# File lib/omf_common/comm/topic.rb, line 121 def unsubscribe(key) @lock.synchronize do @handlers.clear @@name2inst.delete_if { |k, v| k == id.to_sym || k == address.to_sym} end end
Private Instance Methods
_send_message(msg, opts = {}, block = nil)
click to toggle source
_send_message
will also register callbacks for reply messages by default
# File lib/omf_common/comm/topic.rb, line 160 def _send_message(msg, opts = {}, block = nil) if (block) # register callback for responses to 'mid' debug "(#{id}) register callback for responses to 'mid: #{msg.mid}'" @lock.synchronize do @context2cbk[msg.mid.to_s] = { block: block, created_at: Time.now } end end end
add_message_handler(handler_name, key, &message_block)
click to toggle source
# File lib/omf_common/comm/topic.rb, line 205 def add_message_handler(handler_name, key, &message_block) raise ArgumentError, 'Missing message callback' if message_block.nil? debug "(#{id}) register handler for '#{handler_name}'" @lock.synchronize do key ||= OpenSSL::Digest::SHA1.new(message_block.source_location.to_s).to_s (@handlers[handler_name] ||= {})[key] = message_block end self end
on_incoming_message(msg)
click to toggle source
Process a message received from this topic.
@param [OmfCommon::Message] msg Message
received
# File lib/omf_common/comm/topic.rb, line 174 def on_incoming_message(msg) type = msg.operation debug "(#{id}) Deliver message '#{type}': #{msg.inspect}" htypes = [type, :message] if type == :inform # TODO keep converting itype is painful, need to solve this. if (it = msg.itype(:ruby)) # format itype as lower case string case it when "creation_ok" htypes << :create_succeeded when 'status' htypes << :inform_status end htypes << it.to_sym end end debug "(#{id}) Message type '#{htypes.inspect}' (#{msg.class}:#{msg.cid})" hs = htypes.map { |ht| (@handlers[ht] || {}).values }.compact.flatten debug "(#{id}) Distributing message to '#{hs.inspect}'" hs.each do |block| block.call msg end if cbk = @context2cbk[msg.cid.to_s] debug "(#{id}) Distributing message to '#{cbk.inspect}'" cbk[:last_used] = Time.now cbk[:block].call(msg) end end