class OmfCommon::Comm::Topic

Attributes

id[R]
routing_key[R]

Public Class Methods

[](name) click to toggle source
# File lib/omf_common/comm/topic.rb, line 33
def self.[](name)
  @@name2inst[name]
end
create(name, opts = {}, &block) click to toggle source
# File lib/omf_common/comm/topic.rb, line 17
def self.create(name, opts = {}, &block)
  # Force string conversion as 'name' can be an ExperimentProperty
  name = name.to_s.to_sym
  @@lock.synchronize do
    unless @@name2inst[name]
      debug "New topic: #{name} | #{opts[:routing_key]}"
      #opts[:address] ||= address_for(name)
      @@name2inst[name] = self.new(name, opts, &block)
    else
      debug "Existing topic: #{name} | #{@@name2inst[name].routing_key}"
      block.call(@@name2inst[name]) if block
    end
    @@name2inst[name]
  end
end
new(id, opts = {}) click to toggle source
# File lib/omf_common/comm/topic.rb, line 150
def initialize(id, opts = {})
  @id = id
  #@address = opts[:address]
  @handlers = {}
  @lock = Monitor.new
  @context2cbk = {}
end

Public Instance Methods

address() click to toggle source
# File lib/omf_common/comm/topic.rb, line 137
def address
  raise NotImplementedError
end
after(delay_sec, &block) click to toggle source
# File lib/omf_common/comm/topic.rb, line 141
def after(delay_sec, &block)
  return unless block
  OmfCommon.eventloop.after(delay_sec) do
    block.arity == 1 ? block.call(self) : block.call
  end
end
configure(props = {}, core_props = {}, &block) click to toggle source
# File lib/omf_common/comm/topic.rb, line 48
def configure(props = {}, core_props = {}, &block)
  create_message_and_publish(:configure, props, core_props, block)
  self
end
create(res_type, config_props = {}, core_props = {}, &block) click to toggle source

Request the creation of a new resource. Returns itself

# File lib/omf_common/comm/topic.rb, line 41
def create(res_type, config_props = {}, core_props = {}, &block)
  config_props[:type] ||= res_type
  debug "Create resource of type '#{res_type}'"
  create_message_and_publish(:create, config_props, core_props, block)
  self
end
create_message_and_publish(type, props = {}, core_props = {}, block = nil) click to toggle source

Only used for create, configure and request

# File lib/omf_common/comm/topic.rb, line 77
def create_message_and_publish(type, props = {}, core_props = {}, block = nil)
  debug "(#{id}) create_message_and_publish '#{type}': #{props.inspect}: #{core_props.inspect}"
  core_props[:src] ||= OmfCommon.comm.local_address
  msg = OmfCommon::Message.create(type, props, core_props)
  publish(msg, { routing_key: "o.op" }, &block)
end
error?() click to toggle source

For detecting message publishing error, means if callback indeed yield a Topic object, there is no publishing error, thus always false

# File lib/omf_common/comm/topic.rb, line 133
def error?
  false
end
inform(type, props = {}, core_props = {}, &block) click to toggle source
# File lib/omf_common/comm/topic.rb, line 59
def inform(type, props = {}, core_props = {}, &block)
  core_props[:src] ||= OmfCommon.comm.local_address
  msg = OmfCommon::Message.create(:inform, props, core_props.merge(itype: type))
  publish(msg, { routing_key: "o.info" }, &block)
  self
end
on_inform(key = nil, &message_block) click to toggle source
# File lib/omf_common/comm/topic.rb, line 114
def on_inform(key = nil, &message_block)
  add_message_handler(:inform, key, &message_block)
end
on_message(key = nil, &message_block) click to toggle source
# File lib/omf_common/comm/topic.rb, line 110
def on_message(key = nil, &message_block)
  add_message_handler(:message, key, &message_block)
end
on_subscribed(&block) click to toggle source
# File lib/omf_common/comm/topic.rb, line 128
def on_subscribed(&block)
  raise NotImplementedError
end
publish(msg, opts = {}, &block) click to toggle source
# File lib/omf_common/comm/topic.rb, line 84
def publish(msg, opts = {}, &block)
  error "!!!" if opts[:routing_key].nil?

  raise "Expected message but got '#{msg.class}" unless msg.is_a?(OmfCommon::Message)
  _send_message(msg, opts, block)
end
release(resource, core_props = {}, &block) click to toggle source
# File lib/omf_common/comm/topic.rb, line 66
def release(resource, core_props = {}, &block)
  unless resource.is_a? self.class
    raise ArgumentError, "Expected '#{self.class}', but got '#{resource.class}'"
  end
  core_props[:src] ||= OmfCommon.comm.local_address
  msg = OmfCommon::Message.create(:release, {}, core_props.merge(res_id: resource.id))
  publish(msg, { routing_key: "o.op" }, &block)
  self
end
request(select = [], core_props = {}, &block) click to toggle source
# File lib/omf_common/comm/topic.rb, line 53
def request(select = [], core_props = {}, &block)
  # TODO: What are the parameters to the request method really?
  create_message_and_publish(:request, select, core_props, block)
  self
end
unsubscribe(key) click to toggle source

Remove all registered callbacks for ‘key’. Will also unsubscribe from the underlying comms layer if no callbacks remain.

# File lib/omf_common/comm/topic.rb, line 121
def unsubscribe(key)
  @lock.synchronize do
    @handlers.clear
    @@name2inst.delete_if { |k, v| k == id.to_sym || k == address.to_sym}
  end
end

Private Instance Methods

_send_message(msg, opts = {}, block = nil) click to toggle source

_send_message will also register callbacks for reply messages by default

# File lib/omf_common/comm/topic.rb, line 160
def _send_message(msg, opts = {}, block = nil)
  if (block)
    # register callback for responses to 'mid'
    debug "(#{id}) register callback for responses to 'mid: #{msg.mid}'"
    @lock.synchronize do
      @context2cbk[msg.mid.to_s] = { block: block, created_at: Time.now }
    end
  end
end
add_message_handler(handler_name, key, &message_block) click to toggle source
# File lib/omf_common/comm/topic.rb, line 205
def add_message_handler(handler_name, key, &message_block)
  raise ArgumentError, 'Missing message callback' if message_block.nil?
  debug "(#{id}) register handler for '#{handler_name}'"
  @lock.synchronize do
    key ||= OpenSSL::Digest::SHA1.new(message_block.source_location.to_s).to_s
    (@handlers[handler_name] ||= {})[key] = message_block
  end
  self
end
on_incoming_message(msg) click to toggle source

Process a message received from this topic.

@param [OmfCommon::Message] msg Message received

# File lib/omf_common/comm/topic.rb, line 174
def on_incoming_message(msg)
  type = msg.operation
  debug "(#{id}) Deliver message '#{type}': #{msg.inspect}"
  htypes = [type, :message]
  if type == :inform
    # TODO keep converting itype is painful, need to solve this.
    if (it = msg.itype(:ruby)) # format itype as lower case string
      case it
      when "creation_ok"
        htypes << :create_succeeded
      when 'status'
        htypes << :inform_status
      end

      htypes << it.to_sym
    end
  end

  debug "(#{id}) Message type '#{htypes.inspect}' (#{msg.class}:#{msg.cid})"
  hs = htypes.map { |ht| (@handlers[ht] || {}).values }.compact.flatten
  debug "(#{id}) Distributing message to '#{hs.inspect}'"
  hs.each do |block|
    block.call msg
  end
  if cbk = @context2cbk[msg.cid.to_s]
    debug "(#{id}) Distributing message to '#{cbk.inspect}'"
    cbk[:last_used] = Time.now
    cbk[:block].call(msg)
  end
end