class OmfCommon::Comm

PubSub communication class, can be extended with different implementations

Public Class Methods

init(opts) click to toggle source

opts:

:type - pre installed comms provider
:provider - custom provider (opts)
  :require - gem to load first (opts)
  :constructor - Class implementing provider
# File lib/omf_common/comm.rb, line 44
def self.init(opts)
  unless @@instance
    unless provider = opts[:provider]
      unless type = opts[:type]
        if url = opts[:url]
          type = url.split(':')[0].to_sym
        end
      end
      provider = @@providers[type]
    end
    unless provider
      raise ArgumentError, "Missing Comm provider declaration. Either define 'type', 'provider', or 'url'"
    end

    require provider[:require] if provider[:require]

    if class_name = provider[:constructor]
      provider_class = class_name.split('::').inject(Object) {|c,n| c.const_get(n) }
      inst = provider_class.new(opts)
    else
      raise ArgumentError, "Missing communicator creation info - :constructor"
    end
    @@instance = inst
    mopts = provider[:message_provider]
    mopts[:authenticate] = opts[:auth]
    Message.init(mopts)

    if aopts = opts[:auth]
      require 'omf_common/auth'
      OmfCommon::Auth.init(aopts)
    end

    inst.init(opts)
  end
end
instance() click to toggle source
# File lib/omf_common/comm.rb, line 80
def self.instance
  @@instance
end
new(opts = {}) click to toggle source
# File lib/omf_common/comm.rb, line 173
def initialize(opts = {})
  @opts = opts
  unless local_address = opts[:local_address]
    hostname = nil
    begin
      hostname = Socket.gethostbyname(Socket.gethostname)[0]
    rescue
      hostname = (`hostname` || 'unknown').strip
    end
    local_address = "#{hostname}-#{Process.pid}"
  end
  on_connected do
    @local_topic = create_topic(local_address.gsub('.', '-'))
  end
end

Public Instance Methods

conn_info() click to toggle source

Returning connection information

@return [Hash] connection information hash, with type, user and domain.

# File lib/omf_common/comm.rb, line 130
def conn_info
  { proto: nil, user: nil, domain: nil }
end
create_topic(topic, opts = {}) click to toggle source

Create a new pubsub topic with additional configuration

@param [String] topic Pubsub topic name

# File lib/omf_common/comm.rb, line 116
def create_topic(topic, opts = {})
  raise NotImplementedError
end
delete_topic(topic, &block) click to toggle source

Delete a pubsub topic

@param [String] topic Pubsub topic name

# File lib/omf_common/comm.rb, line 123
def delete_topic(topic, &block)
  raise NotImplementedError
end
disconnect(opts = {}) click to toggle source

Shut down comms layer

# File lib/omf_common/comm.rb, line 101
def disconnect(opts = {})
  raise NotImplementedError
end
init(opts = {}) click to toggle source

Initialize comms layer

# File lib/omf_common/comm.rb, line 86
def init(opts = {})
end
local_address() click to toggle source

Return the address used for all ‘generic’ messages not specifically being sent from a resource

# File lib/omf_common/comm.rb, line 92
def local_address()
  @local_topic.address
end
local_topic() click to toggle source
# File lib/omf_common/comm.rb, line 96
def local_topic()
  @local_topic
end
on_connected(&block) click to toggle source
# File lib/omf_common/comm.rb, line 105
def on_connected(&block)
  raise NotImplementedError
end
on_interrupted(*args, &block) click to toggle source

TODO should expand this to on_signal(:INT)

# File lib/omf_common/comm.rb, line 110
def on_interrupted(*args, &block)
end
options() click to toggle source

Return the options used to initiate this communicator.

# File lib/omf_common/comm.rb, line 168
def options()
  @opts
end
string_to_topic_address(a_string) click to toggle source

Take a string and use it to generate a valid topic address for this type of communicator Must be implemented by subclasses

This may be used when we construct an FRCP Configure message, which requests some resources to subscribe to a topic, which has not yet been created at the time of this message’s construction, but which will be created before this message is published. (an example of such case can be found in OMF EC Group handling code)

# File lib/omf_common/comm.rb, line 141
def string_to_topic_address(a_string)
  raise NotImplementedError
end
subscribe(topic_name, opts = {}, &block) click to toggle source

Subscribe to a pubsub topic

@param [String, Array] topic_name Pubsub topic name @param [Hash] opts @option opts [Boolean] :create_if_non_existent create the topic if non-existent, use this option with caution

# File lib/omf_common/comm.rb, line 151
def subscribe(topic_name, opts = {}, &block)
  tna = (topic_name.is_a? Array) ? topic_name : [topic_name]
  ta = tna.collect do |tn|
    t = create_topic(tn.to_s, opts)
    if block
      t.on_subscribed do
        block.call(t)
      end
    end
    t
  end
  ta[0]
end