class OmfCommon::Comm::AMQP::Communicator

Attributes

channel[R]

def initialize(opts = {}) # ignore arguments end

Public Class Methods

new(opts = {}) click to toggle source
Calls superclass method OmfCommon::Comm::new
# File lib/omf_common/comm/amqp/amqp_communicator.rb, line 131
def initialize(opts = {})
  @on_connected_procs = []
  @on_reconnect = {}
  super
end

Public Instance Methods

broadcast_file(file_path, topic_name = nil, opts = {}, &block) click to toggle source
# File lib/omf_common/comm/amqp/amqp_communicator.rb, line 114
def broadcast_file(file_path, topic_name = nil, opts = {}, &block)
  topic_name ||= SecureRandom.uuid
  require 'omf_common/comm/amqp/amqp_file_transfer'
  OmfCommon::Comm::AMQP::FileBroadcaster.new(file_path, @channel, topic_name, opts, &block)
  "bdcst:#{@address_prefix + topic_name}"
end
conn_info() click to toggle source
# File lib/omf_common/comm/amqp/amqp_communicator.rb, line 51
def conn_info
  { proto: :amqp, user: ::AMQP.settings[:user], domain: ::AMQP.settings[:host] }
end
create_topic(topic, opts = {}) click to toggle source

Create a new pubsub topic with additional configuration

@param [String] topic Pubsub topic name

# File lib/omf_common/comm/amqp/amqp_communicator.rb, line 84
def create_topic(topic, opts = {})
  raise "Topic can't be nil or empty" if topic.nil? || topic.to_s.empty?
  opts = opts.dup
  opts[:communicator] = self
  topic = topic.to_s
  if topic.start_with? 'amqp:'
    # absolute address
    unless topic.start_with? @address_prefix
      raise "Cannot subscribe to a topic from different domain (#{topic}) - #{@address_prefix}"
    end
    opts[:address] = topic
    topic = topic.split(@address_prefix).last
  else
    opts[:address] = @address_prefix + topic
  end
  OmfCommon::Comm::AMQP::Topic.create(topic, opts)
end
delete_topic(topic, &block) click to toggle source

Delete a pubsub topic

@param [String] topic Pubsub topic name

# File lib/omf_common/comm/amqp/amqp_communicator.rb, line 105
def delete_topic(topic, &block)
  # FIXME CommProvider?
  if t = OmfCommon::CommProvider::AMQP::Topic.find(topic)
    t.release
  else
    warn "Attempt to delete unknown topic '#{topic}"
  end
end
disconnect(opts = {}) click to toggle source

Shut down comms layer

# File lib/omf_common/comm/amqp/amqp_communicator.rb, line 60
def disconnect(opts = {})
  info "Disconnecting..."
end
init(opts = {}) click to toggle source

Initialize comms layer

Calls superclass method OmfCommon::Comm::init
# File lib/omf_common/comm/amqp/amqp_communicator.rb, line 34
def init(opts = {})
  @opts = {
    #:ssl (Hash) TLS (SSL) parameters to use.
    heartbeat: 20, # (Fixnum) - default: 0 Connection heartbeat, in seconds. 0 means no heartbeat. Can also be configured server-side starting with RabbitMQ 3.0.
    #:on_tcp_connection_failure (#call) - A callable object that will be run if connection to server fails
    #:on_possible_authentication_failure (#call) - A callable object that will be run if authentication fails (see Authentication failure section)
    reconnect_delay: 20 # (Fixnum) - Delay in seconds before attempting reconnect on detected failure
  }.merge(opts)

  unless (@url = @opts.delete(:url))
    raise "Missing 'url' option for AQMP layer"
  end
  @address_prefix = @url + '/frcp.'
  _connect()
  super
end
on_connected(&block) click to toggle source

TODO: Should be thread safe and check if already connected

# File lib/omf_common/comm/amqp/amqp_communicator.rb, line 65
def on_connected(&block)
  @on_connected_procs << block
end
on_reconnect(key, &block) click to toggle source

register callbacks to be called when the underlying AMQP layer needs to reconnect to the AMQP server. This may require some additional repairs. If ‘block’ is nil, the callback is removed

# File lib/omf_common/comm/amqp/amqp_communicator.rb, line 73
def on_reconnect(key, &block)
  if block.nil?
    @on_reconnect.delete(key)
  else
    @on_reconnect[key] = block
  end
end
receive_file(topic_url, file_path = nil, opts = {}, &block) click to toggle source
# File lib/omf_common/comm/amqp/amqp_communicator.rb, line 121
def receive_file(topic_url, file_path = nil, opts = {}, &block)
  if topic_url.start_with? @address_prefix
    topic_url = topic_url[@address_prefix.length .. -1]
  end
  require 'omf_common/comm/amqp/amqp_file_transfer'
  file_path ||= File.join(Dir.tmpdir, Dir::Tmpname.make_tmpname('bdcast', '.xxx'))
  FileReceiver.new(file_path, @channel, topic_url, opts, &block)
end
string_to_topic_address(a_string) click to toggle source
# File lib/omf_common/comm/amqp/amqp_communicator.rb, line 55
def string_to_topic_address(a_string)
  @address_prefix+a_string
end

Private Instance Methods

_connect() click to toggle source
# File lib/omf_common/comm/amqp/amqp_communicator.rb, line 137
def _connect()
  begin
    last_reported_timestamp = nil
    @session = ::AMQP.connect(@url, @opts) do |connection|
      connection.on_tcp_connection_loss do |conn, settings|
        now = Time.now
        if last_reported_timestamp == nil || (now - last_reported_timestamp) > 60
          warn "Lost connectivity. Trying to reconnect..."
          last_reported_timestamp = now
        end
        _reconnect(conn)
      end
      @channel = ::AMQP::Channel.new(connection, auto_recovery: true, prefetch: 10)

      @on_connected_procs.each do |proc|
        proc.arity == 1 ? proc.call(self) : proc.call
      end

      OmfCommon.eventloop.on_stop do
        connection.close
      end
    end

    rec_delay = @opts[:reconnect_delay]
    @session.on_tcp_connection_failure do
      warn "Cannot connect to AMQP server '#{@url}'. Attempt to retry in #{rec_delay} sec"
      @session = nil
      OmfCommon.eventloop.after(rec_delay) do
        info 'Retrying'
        _connect
      end
    end
    # @session.on_tcp_connection_loss do
      # _reconnect "Appear to have lost tcp connection. Attempt to reconnect in #{rec_delay} sec"
    # end
    @session.on_skipped_heartbeats do
      info '... on_skipped_heartbeats!'
      #_reconnect "Appear to have lost heartbeat. Attempt to reconnect in #{rec_delay} sec"
    end
    @session.on_recovery do
      info 'Recovered!'
      last_reported_timestamp = nil
      @on_reconnect.values.each do |block|
        block.call()
      end
    end
    true
  rescue Exception => ex
    delay = @opts[:reconnect_delay]
    warn "Connecting AMQP failed, will retry in #{delay} (#{ex})"
    OmfCommon.eventloop.after(delay) do
      if _connect
        info 'Reconnection suceeded'
      end
    end
    false
  end
end
_reconnect(conn) click to toggle source
# File lib/omf_common/comm/amqp/amqp_communicator.rb, line 196
def _reconnect(conn)
  begin
    conn.reconnect(false, 2)
  rescue Exception => ex
    delay = @opts[:reconnect_delay]
    warn "Reconnect AMQP failed, will retry in #{delay} (#{ex})"
    OmfCommon.eventloop.after(delay) do
      info 'Reconnecting'
      _reconnect(conn)
    end
  end
end