class XRBP::WebSocket::Plugins::MessageDispatcher

Dispatch messages & wait for responses (w/ optional timeout). This module allows the client to track messages sent to the server, waiting for responses up to a maximum time. An overridable callback method is provided to match responses to messages. Most often the end-user will not use this plugin directly but rather through CommandDispatcher which inherits it / extends it to issue and track structured commands.

@see CommandDispatcher

Constants

DEFAULT_TIMEOUT

Attributes

message_timeout[RW]
messages[R]

Public Class Methods

new(connection) click to toggle source
Calls superclass method XRBP::PluginBase::new
# File lib/xrbp/websocket/plugins/message_dispatcher.rb, line 26
def initialize(connection)
  super(connection)
  @message_timeout = DEFAULT_TIMEOUT
  @messages = []
end

Public Instance Methods

added() click to toggle source
# File lib/xrbp/websocket/plugins/message_dispatcher.rb, line 32
def added
  plugin = self

  connection.define_instance_method(:message_timeout=) do |t|
    plugin.message_timeout = t

    connections.each{ |c|
      c.plugin(MessageDispatcher)
       .message_timeout = t
    } if self.kind_of?(MultiConnection)
  end

  connection.define_instance_method(:msg) do |msg, &bl|
    return next_connection.msg msg, &bl if self.kind_of?(MultiConnection)

    msg = Message.new(msg) unless msg.kind_of?(Message)
    msg.connection = self
    msg.time = Time.now
    msg.bl = bl if bl

    unless self.open?
      if plugin.try_next(msg)
        return nil if bl
               msg.wait
        return msg.result

      else
        msg.bl.call nil if bl
        return nil
      end
    end

    plugin.messages << msg

    send_data msg.to_s

    return nil if bl
    msg.wait
    msg.result
  end

  connection.on :close do
    plugin.cancel_all_messages
  end unless connection.kind_of?(MultiConnection)
end
cancel_all_messages() click to toggle source
# File lib/xrbp/websocket/plugins/message_dispatcher.rb, line 136
def cancel_all_messages
  # copy array as we modify original during iteration
  Array.new(messages).each { |msg|
    cancel_message(msg)
  }
end
cancel_message(msg) click to toggle source

FIXME: I believe there is issue causing deadlock at process

termination where subsequent pages in paginated cmds
are timing out. Since when retrieving messages
synchronously, the first message block will be used
to wait for the results and on timeout cancel_message
will be called with the _latest_ message, the wait
block never gets unlocked.
# File lib/xrbp/websocket/plugins/message_dispatcher.rb, line 129
def cancel_message(msg)
  connection.state_mutex.synchronize {
    messages.delete(msg)
    msg.signal
  }
end
closed() click to toggle source
# File lib/xrbp/websocket/plugins/message_dispatcher.rb, line 171
def closed
  terminate!
end
match_message(msg) click to toggle source

Should be overridden in subclass return request message & formatted response given raw response

# File lib/xrbp/websocket/plugins/message_dispatcher.rb, line 81
def match_message(msg)
  nil
end
message(res) click to toggle source
# File lib/xrbp/websocket/plugins/message_dispatcher.rb, line 93
def message(res)
  req, res = match_message(res)
  return unless req
  messages.delete(req)

  return unless unlock!(req, res)

  begin
    res = parse_result(res, req)
  rescue Exception => e
    if try_next(req)
      return

    else
      res = nil
    end
  end

  req.bl.call(res)
end
opened() click to toggle source
# File lib/xrbp/websocket/plugins/message_dispatcher.rb, line 145
def opened
  connection.add_work do
    # XXX remove force_quit? condition check from this loop,
    #     so we're sure messages always timeout, even on force quit.
    #     Always ensure close! is called after websocket is no longer
    #     being used!
    until terminate? || connection.closed?
      now = Time.now
      tmsgs = Array.new(messages)
      tmsgs.each { |msg|
        if now - msg.time > @message_timeout
          connection.emit :timeout, msg

          cancel_message(msg) unless try_next(msg)

          # XXX manually close the connection as
          #     a broken pipe will not stop websocket polling
          connection.async_close!
        end
      }

      connection.rsleep(0.1)
    end
  end
end
parsing_plugins() click to toggle source
# File lib/xrbp/websocket/plugins/message_dispatcher.rb, line 19
def parsing_plugins
  connection.plugins
end
try_next(msg) click to toggle source
# File lib/xrbp/websocket/plugins/message_dispatcher.rb, line 114
def try_next(msg)
  conn = connection.next_connection(msg.connection)
  return false unless !!conn
  messages.delete(msg)
  conn.msg(msg, &msg.bl)
  true
end
unlock!(req, res) click to toggle source

Return bool if message,response is read to be unlocked / returned to client. Allows other plugins to block message unlocking

# File lib/xrbp/websocket/plugins/message_dispatcher.rb, line 87
def unlock!(req, res)
  !connection.plugins.any? { |plg|
    plg != self && plg.respond_to?(:unlock!) && !plg.unlock!(req, res)
  }
end