class OmfCommon::Comm
PubSub communication class, can be extended with different implementations
Public Class Methods
opts:
:type - pre installed comms provider :provider - custom provider (opts) :require - gem to load first (opts) :constructor - Class implementing provider
# File lib/omf_common/comm.rb, line 44 def self.init(opts) unless @@instance unless provider = opts[:provider] unless type = opts[:type] if url = opts[:url] type = url.split(':')[0].to_sym end end provider = @@providers[type] end unless provider raise ArgumentError, "Missing Comm provider declaration. Either define 'type', 'provider', or 'url'" end require provider[:require] if provider[:require] if class_name = provider[:constructor] provider_class = class_name.split('::').inject(Object) {|c,n| c.const_get(n) } inst = provider_class.new(opts) else raise ArgumentError, "Missing communicator creation info - :constructor" end @@instance = inst mopts = provider[:message_provider] mopts[:authenticate] = opts[:auth] Message.init(mopts) if aopts = opts[:auth] require 'omf_common/auth' OmfCommon::Auth.init(aopts) end inst.init(opts) end end
# File lib/omf_common/comm.rb, line 80 def self.instance @@instance end
# File lib/omf_common/comm.rb, line 173 def initialize(opts = {}) @opts = opts unless local_address = opts[:local_address] hostname = nil begin hostname = Socket.gethostbyname(Socket.gethostname)[0] rescue hostname = (`hostname` || 'unknown').strip end local_address = "#{hostname}-#{Process.pid}" end on_connected do @local_topic = create_topic(local_address.gsub('.', '-')) end end
Public Instance Methods
Returning connection information
@return [Hash] connection information hash, with type, user and domain.
# File lib/omf_common/comm.rb, line 130 def conn_info { proto: nil, user: nil, domain: nil } end
Create a new pubsub topic with additional configuration
@param [String] topic Pubsub topic name
# File lib/omf_common/comm.rb, line 116 def create_topic(topic, opts = {}) raise NotImplementedError end
Delete a pubsub topic
@param [String] topic Pubsub topic name
# File lib/omf_common/comm.rb, line 123 def delete_topic(topic, &block) raise NotImplementedError end
Shut down comms layer
# File lib/omf_common/comm.rb, line 101 def disconnect(opts = {}) raise NotImplementedError end
Initialize comms layer
# File lib/omf_common/comm.rb, line 86 def init(opts = {}) end
Return the address used for all ‘generic’ messages not specifically being sent from a resource
# File lib/omf_common/comm.rb, line 92 def local_address() @local_topic.address end
# File lib/omf_common/comm.rb, line 96 def local_topic() @local_topic end
# File lib/omf_common/comm.rb, line 105 def on_connected(&block) raise NotImplementedError end
TODO should expand this to on_signal(:INT)
# File lib/omf_common/comm.rb, line 110 def on_interrupted(*args, &block) end
Return the options used to initiate this communicator.
# File lib/omf_common/comm.rb, line 168 def options() @opts end
Take a string and use it to generate a valid topic address for this type of communicator Must be implemented by subclasses
This may be used when we construct an FRCP Configure message, which requests some resources to subscribe to a topic, which has not yet been created at the time of this message’s construction, but which will be created before this message is published. (an example of such case can be found in OMF EC Group handling code)
# File lib/omf_common/comm.rb, line 141 def string_to_topic_address(a_string) raise NotImplementedError end
Subscribe to a pubsub topic
@param [String, Array] topic_name Pubsub topic name @param [Hash] opts @option opts [Boolean] :create_if_non_existent create the topic if non-existent, use this option with caution
# File lib/omf_common/comm.rb, line 151 def subscribe(topic_name, opts = {}, &block) tna = (topic_name.is_a? Array) ? topic_name : [topic_name] ta = tna.collect do |tn| t = create_topic(tn.to_s, opts) if block t.on_subscribed do block.call(t) end end t end ta[0] end