class OmfCommon::Comm::XMPP::Communicator

Constants

HOST_PREFIX
PING_INTERVAL
PUBSUB_CONFIGURE
RETRY_INTERVAL

Attributes

normal_shutdown_mode[RW]
published_messages[RW]
retry_counter[RW]

Public Class Methods

new(opts = {}) click to toggle source
Calls superclass method OmfCommon::Comm::new
# File lib/omf_common/comm/xmpp/communicator.rb, line 300
def initialize(opts = {})
  self.published_messages = []
  @cbks = {connected: [], interpreted: []}
  super
end

Public Instance Methods

_create(topic, pubsub_host = default_host, &block) click to toggle source
# File lib/omf_common/comm/xmpp/communicator.rb, line 224
def _create(topic, pubsub_host = default_host, &block)
  pubsub.create(topic, pubsub_host, PUBSUB_CONFIGURE, &callback_logging(__method__, topic, &block))
end
_subscribe(topic, pubsub_host = default_host, &block) click to toggle source
# File lib/omf_common/comm/xmpp/communicator.rb, line 220
def _subscribe(topic, pubsub_host = default_host, &block)
  pubsub.subscribe(topic, nil, pubsub_host, &callback_logging(__method__, topic, &block))
end
_unsubscribe_one(topic_id, pubsub_host = default_host) click to toggle source

Un-subscribe one single topic by topic address

# File lib/omf_common/comm/xmpp/communicator.rb, line 241
def _unsubscribe_one(topic_id, pubsub_host = default_host)
  pubsub.subscriptions(pubsub_host) do |m|
    m[:subscribed] && m[:subscribed].each do |s|
      if s[:node] == topic_id.to_s
        pubsub.unsubscribe(s[:node], nil, s[:subid], pubsub_host, &callback_logging(__method__, s[:node], s[:subid]))
      end
    end
  end
end
affiliations(pubsub_host = default_host, &block) click to toggle source
# File lib/omf_common/comm/xmpp/communicator.rb, line 251
def affiliations(pubsub_host = default_host, &block)
  pubsub.affiliations(pubsub_host, &callback_logging(__method__, &block))
end
conn_info() click to toggle source
# File lib/omf_common/comm/xmpp/communicator.rb, line 69
def conn_info
  { proto: :xmpp, user: jid.node, domain: jid.domain }
end
connect(username, password, server) click to toggle source

Set up XMPP options and start the Eventmachine, connect to XMPP server

# File lib/omf_common/comm/xmpp/communicator.rb, line 172
def connect(username, password, server)
  info "Connecting to '#{server}' ..."
  begin
    client.run
  rescue ::EventMachine::ConnectionError, Blather::Stream::ConnectionTimeout, Blather::Stream::NoConnection, Blather::Stream::ConnectionFailed => e
    warn "[#{e.class}] #{e}, try again..."
    OmfCommon.el.after(RETRY_INTERVAL) do
      connect(username, password, server)
    end
  end
end
create_topic(topic, opts = {}) click to toggle source

Create a new pubsub topic with additional configuration

@param [String] topic Pubsub topic name

# File lib/omf_common/comm/xmpp/communicator.rb, line 198
def create_topic(topic, opts = {})
  OmfCommon::Comm::XMPP::Topic.create(topic)
end
delete_topic(topic, pubsub_host = default_host, &block) click to toggle source

Delete a pubsub topic

@param [String] topic Pubsub topic name

# File lib/omf_common/comm/xmpp/communicator.rb, line 205
def delete_topic(topic, pubsub_host = default_host, &block)
  pubsub.delete(topic, pubsub_host, &callback_logging(__method__, topic, &block))
end
disconnect(opts = {}) click to toggle source

Shut down XMPP connection

# File lib/omf_common/comm/xmpp/communicator.rb, line 185
def disconnect(opts = {})
  # NOTE Do not clean up
  @lock.synchronize do
    @normal_shutdown_mode = true
  end
  info "Disconnecting..."
  shutdown
  OmfCommon::DSL::Xmpp::MPConnection.inject(Time.now.to_f, jid, 'disconnect') if OmfCommon::Measure.enabled?
end
init(opts = {}) click to toggle source

Set up XMPP options and start the Eventmachine, connect to XMPP server

Calls superclass method OmfCommon::Comm::init
# File lib/omf_common/comm/xmpp/communicator.rb, line 88
def init(opts = {})
  @lock = Monitor.new

  @pubsub_host = opts[:pubsub_domain]
  if opts[:url]
    url = URI(opts[:url])
    username, password, server = url.user, url.password, url.host
  else
    username, password, server = opts[:username], opts[:password], opts[:server]
  end

  random_name = "#{Socket.gethostname}-#{Process.pid}"
  username ||= random_name
  password ||= random_name

  raise ArgumentError, "Username cannot be nil when connect to XMPP" if username.nil?
  raise ArgumentError, "Password cannot be nil when connect to XMPP" if password.nil?
  raise ArgumentError, "Server cannot be nil when connect to XMPP" if server.nil?

  @retry_counter = 0
  @normal_shutdown_mode = false

  username.downcase!
  jid = "#{username}@#{server}"
  client.setup(jid, password)
  connect(username, password, server)

  when_ready do
    if @not_initial_connection
      info "Reconnected"
    else
      info "Connected"
      OmfCommon::DSL::Xmpp::MPConnection.inject(Time.now.to_f, jid, 'connected') if OmfCommon::Measure.enabled?
      @cbks[:connected].each { |cbk| cbk.call(self) }
      # It will be reconnection after this
      @lock.synchronize do
        @not_initial_connection = true
      end
    end

    @lock.synchronize do
      @pong = true
      @ping_alive_timer = OmfCommon.el.every(PING_INTERVAL) do
        if @pong
          @lock.synchronize do
            @pong = false # Reset @pong
          end
          ping_alive
        else
          warn "No PONG. No connection..."
          @lock.synchronize do
            @ping_alive_timer.cancel
          end
          connect(username, password, server)
        end
      end
    end
  end

  disconnected do
    @lock.synchronize do
      @pong = false # Reset @pong
      @ping_alive_timer && @ping_alive_timer.cancel
    end

    if normal_shutdown_mode
      shutdown
    else
      warn "Disconnected... Last known state: #{client.state}"
      retry_interval = client.state == :initializing ? 0 : RETRY_INTERVAL
      OmfCommon.el.after(retry_interval) do
        connect(username, password, server)
      end
    end
  end

  trap(:INT) { @cbks[:interpreted].empty? ? disconnect : @cbks[:interpreted].each { |cbk| cbk.call(self) } }
  trap(:TERM) { @cbks[:interpreted].empty? ? disconnect : @cbks[:interpreted].each { |cbk| cbk.call(self) } }

  super
end
on_connected(&block) click to toggle source
# File lib/omf_common/comm/xmpp/communicator.rb, line 82
def on_connected(&block)
  @cbks[:connected] << block
end
on_interrupted(&block) click to toggle source

Capture system :INT & :TERM signal

# File lib/omf_common/comm/xmpp/communicator.rb, line 78
def on_interrupted(&block)
  @cbks[:interpreted] << block
end
publish(topic, message, pubsub_host = default_host, &block) click to toggle source

Publish to a pubsub topic

@param [String] topic Pubsub topic name @param [OmfCommon::Message] message Any XML fragment to be sent as payload

# File lib/omf_common/comm/xmpp/communicator.rb, line 259
def publish(topic, message, pubsub_host = default_host, &block)
  raise StandardError, "Invalid message" unless message.valid?

  message = message.marshall[1] unless message.kind_of? String
  if message.nil?
    debug "Cannot publish empty message, using authentication and not providing a proper cert?"
    return nil
  end

  new_block = proc do |stanza|
    published_messages << OpenSSL::Digest::SHA1.new(message.to_s)
    block.call(stanza) if block
  end

  pubsub.publish(topic, message, pubsub_host, &callback_logging(__method__, topic, &new_block))
  OmfCommon::DSL::Xmpp::MPPublished.inject(Time.now.to_f, jid, topic, message.to_s[/mid="(.{36})/, 1]) if OmfCommon::Measure.enabled?
end
string_to_topic_address(a_string) click to toggle source
# File lib/omf_common/comm/xmpp/communicator.rb, line 73
def string_to_topic_address(a_string)
  "xmpp://#{a_string}@#{jid.domain}"
end
subscribe(topic, opts = {}, &block) click to toggle source

Subscribe to a pubsub topic

@param [String] topic Pubsub topic name @param [Hash] opts @option opts [Boolean] :create_if_non_existent create the topic if non-existent, use this option with caution

# File lib/omf_common/comm/xmpp/communicator.rb, line 214
def subscribe(topic, opts = {}, &block)
  topic = topic.first if topic.is_a? Array
  OmfCommon::Comm::XMPP::Topic.create(topic, &block)
  OmfCommon::DSL::Xmpp::MPSubscription.inject(Time.now.to_f, jid, 'join', topic) if OmfCommon::Measure.enabled?
end
topic_event(additional_guard = nil, &block) click to toggle source

Event callback for pubsub topic event(item published)

# File lib/omf_common/comm/xmpp/communicator.rb, line 279
def topic_event(additional_guard = nil, &block)
  guard_block = proc do |event|
    passed = !event.delayed? && event.items? && !event.items.first.payload.nil? #&&
      #!published_messages.include?(OpenSSL::Digest::SHA1.new(event.items.first.payload))

    if additional_guard
      passed && additional_guard.call(event)
    else
      passed
    end
  end

  mblock = proc do |stanza|
    OmfCommon::DSL::Xmpp::MPReceived.inject(Time.now.to_f, jid, stanza.node, stanza.to_s[/mid="(.{36})/, 1]) if OmfCommon::Measure.enabled?
    block.call(stanza) if block
  end
  pubsub_event(guard_block, &callback_logging(__method__, &mblock))
end
unsubscribe(pubsub_host = default_host) click to toggle source

Un-subscribe all existing subscriptions from all pubsub topics

# File lib/omf_common/comm/xmpp/communicator.rb, line 230
def unsubscribe(pubsub_host = default_host)
  pubsub.subscriptions(pubsub_host) do |m|
    m[:subscribed] && m[:subscribed].each do |s|
      pubsub.unsubscribe(s[:node], nil, s[:subid], pubsub_host, &callback_logging(__method__, s[:node], s[:subid]))
      OmfCommon::DSL::Xmpp::MPSubscription.inject(Time.now.to_f, jid, 'leave', s[:node]) if OmfCommon::Measure.enabled?
    end
  end
end

Private Instance Methods

callback_logging(*args, &block) click to toggle source

Provide a new block wrap to automatically log errors

# File lib/omf_common/comm/xmpp/communicator.rb, line 307
def callback_logging(*args, &block)
  m = args.empty? ? "OPERATION" : args.join(" >> ")
  proc do |stanza|
    if stanza.respond_to?(:error?) && stanza.error?
      e_stanza = Blather::StanzaError.import(stanza)
      if [:unexpected_request].include? e_stanza.name
        logger.debug e_stanza
      elsif e_stanza.name == :conflict
        #logger.debug e_stanza
      else
        logger.warn "#{e_stanza} Original: #{e_stanza.original}"
      end
    end
    logger.debug "#{m} SUCCEED" if stanza.respond_to?(:result?) && stanza.result?
    block.call(stanza) if block
  end
end
default_host() click to toggle source
# File lib/omf_common/comm/xmpp/communicator.rb, line 325
def default_host
  @pubsub_host || "#{HOST_PREFIX}.#{jid.domain}"
end
ping_alive() click to toggle source
# File lib/omf_common/comm/xmpp/communicator.rb, line 329
def ping_alive
  client.write_with_handler Blather::Stanza::Iq::Ping.new(:get, jid.domain) do |response|
    info response
    @lock.synchronize do
      @pong = true
    end
  end
end