class RJR::Node

Base RJR Node interface. Nodes are the central transport mechanism of RJR, this class provides the core methods common among all transport types and mechanisms to start and run the subsystems which drives all requests.

A subclass of RJR::Node should be defined for each transport that is supported. Each subclass should define

* RJR_NODE_TYPE - unique id of the transport
* listen method - begin listening for new requests and return
* send_message(msg, connection) - send message using the specified connection
  (transport dependent)
* invoke - establish connection, send message, and wait for / return result
* notify - establish connection, send message, and immediately return

Not all methods necessarily have to be implemented depending on the context / use of the node, and the base node class provides many utility methods which to assist in message processing (see below).

See nodes residing in lib/rjr/nodes/ for specific examples.

Attributes

connection_event_handlers[R]

Handlers for various connection events

dispatcher[RW]

Dispatcher to use to satisfy requests

message_headers[RW]

Attitional header fields to set on all requests and responses received and sent by node

node_id[R]

Unique string identifier of the node

Public Class Methods

em() click to toggle source
# File lib/rjr/node.rb, line 80
def self.em
  defined?(@@em) ? @@em : nil
end
indirect?() click to toggle source

Bool indiciting if this node is indirect

# File lib/rjr/node.rb, line 58
def indirect?
  self.const_defined?(:INDIRECT_NODE) &&
  self.const_get(:INDIRECT_NODE)
end
new(args = {}) click to toggle source

RJR::Node initializer

@param [Hash] args options to set on request @option args [String] :node_id unique id of the node @option args [Hash<String,String>] :headers optional headers to set

on all json-rpc messages

@option args [Dispatcher] :dispatcher dispatcher to assign to the node

# File lib/rjr/node.rb, line 103
def initialize(args = {})
   clear_event_handlers
   @response_lock = Mutex.new
   @response_cv   = ConditionVariable.new
   @pending       = {}
   @responses     = []

   @node_id         = args[:node_id]
   @timeout         = args[:timeout]
   @wait_interval   = args[:wait_interval] || 0.01
   @dispatcher      = args[:dispatcher] || RJR::Dispatcher.new
   @message_headers = args.has_key?(:headers) ? {}.merge(args[:headers]) : {}

   @@tp ||= ThreadPool.new
   @@em ||= EMAdapter.new

   # will do nothing if already started
   tp.start
   em.start
end
persistent?() click to toggle source

Bool indiciting if this node is persistent

# File lib/rjr/node.rb, line 52
def persistent?
  self.const_defined?(:PERSISTENT_NODE) &&
  self.const_get(:PERSISTENT_NODE)
end
tp() click to toggle source
# File lib/rjr/node.rb, line 88
def self.tp
  defined?(@@tp) ? @@tp : nil
end

Public Instance Methods

clear_event_handlers() click to toggle source

Reset connection event handlers

# File lib/rjr/node.rb, line 148
def clear_event_handlers
  @connection_event_handlers = {
    :opened => [],
    :closed => [],
    :error  => []
  }
end
em() click to toggle source
# File lib/rjr/node.rb, line 84
def em
  self.class.em
end
halt() click to toggle source

Immediately terminate the node

Warning this does what it says it does. All running threads, and reactor jobs are immediately killed

@return self

# File lib/rjr/node.rb, line 140
def halt
  em.stop_event_loop
  tp.stop
  self
end
indirect?() click to toggle source

Bool indicating if this node class is indirect

# File lib/rjr/node.rb, line 70
def indirect?
  self.class.indirect?
end
join() click to toggle source

Block until the eventmachine reactor and thread pool have both completed running.

@return self

# File lib/rjr/node.rb, line 128
def join
  tp.join
  em.join
  self
end
node_type() click to toggle source

alias of RJR_NODE_TYPE

# File lib/rjr/node.rb, line 75
def node_type
  self.class.const_defined?(:RJR_NODE_TYPE) ?
  self.class.const_get(:RJR_NODE_TYPE) : nil
end
on(event, &handler) click to toggle source

Register connection event handler @param event [:opened, :closed, :error] the event to register the handler

for

@param handler [Callable] block param to be added to array of handlers

that are called when event occurs

@yield [Node, *args] self and event-specific *args are passed to each

registered handler when event occurs
# File lib/rjr/node.rb, line 163
def on(event, &handler)
  return unless @connection_event_handlers.keys.include?(event)
  @connection_event_handlers[event] << handler
end
persistent?() click to toggle source

Bool indicating if this node class is persistent

# File lib/rjr/node.rb, line 65
def persistent?
  self.class.persistent?
end
tp() click to toggle source
# File lib/rjr/node.rb, line 92
def tp
  self.class.tp
end

Private Instance Methods

client_for(connection) click to toggle source

Internal helper, extract client info from connection

# File lib/rjr/node.rb, line 180
def client_for(connection)
  # skip if an indirect node type or local
  return nil, nil if self.indirect? || self.node_type == :local

  begin
    return Socket.unpack_sockaddr_in(connection.get_peername)
  rescue Exception=>e
  end

  return nil, nil
end
connection_event(event, *args) click to toggle source

Internal helper, run connection event handlers for specified event, passing self and args to handler

# File lib/rjr/node.rb, line 172
def connection_event(event, *args)
  return unless @connection_event_handlers.keys.include?(event)
  @connection_event_handlers[event].each { |h| h.call(self, *args) }
end
handle_message(msg, connection = {}) click to toggle source

Internal helper, handle message received

# File lib/rjr/node.rb, line 193
def handle_message(msg, connection = {})
  intermediate = Messages::Intermediate.parse(msg)

  if Messages::Request.is_request_message?(intermediate)
    tp << ThreadPoolJob.new(intermediate) { |i|
            handle_request(i, false, connection)
          }

  elsif Messages::Notification.is_notification_message?(intermediate)
    tp << ThreadPoolJob.new(intermediate) { |i|
            handle_request(i, true, connection)
          }

  elsif Messages::Response.is_response_message?(intermediate)
    handle_response(intermediate)

  end

  intermediate
end
handle_request(message, notification=false, connection={}) click to toggle source

Internal helper, handle request message received

# File lib/rjr/node.rb, line 215
def handle_request(message, notification=false, connection={})
  # get client for the specified connection
  # TODO should grap port/ip immediately on connection and use that
  client_port,client_ip = client_for(connection)

  msg = notification ?
    Messages::Notification.new(:message => message,
                               :headers => @message_headers) :
         Messages::Request.new(:message => message,
                               :headers => @message_headers)

  callback = NodeCallback.new(:node       => self,
                              :connection => connection)

  result = @dispatcher.dispatch(:rjr_method      => msg.jr_method,
                                :rjr_method_args => msg.jr_args,
                                :rjr_headers     => msg.headers,
                                :rjr_client_ip   => client_ip,
                                :rjr_client_port => client_port,
                                :rjr_node        => self,
                                :rjr_node_id     => node_id,
                                :rjr_node_type   => self.node_type,
                                :rjr_callback    => callback)

  unless notification
    response = Messages::Response.new(:id      => msg.msg_id,
                                      :result  => result,
                                      :headers => msg.headers,
                                      :request => msg)
    self.send_msg(response.to_s, connection)
    return response
  end

  nil
end
handle_response(message) click to toggle source

Internal helper, handle response message received

# File lib/rjr/node.rb, line 252
def handle_response(message)
  msg    = Messages::Response.new(:message => message,
                                  :headers => self.message_headers)
  res = err = nil
  begin
    res = @dispatcher.handle_response(msg.result)
  rescue Exception => e
    err = e
  end

  @response_lock.synchronize {
    result = [msg.msg_id, res]
    result << err if !err.nil?
    @responses << result
    @response_cv.broadcast
  }
end
wait_for_result(message) click to toggle source

Internal helper, block until response matching message id is received

# File lib/rjr/node.rb, line 271
def wait_for_result(message)
  res = nil
  message_id = message.msg_id
  @pending[message_id] = Time.now
  while res.nil?
    @response_lock.synchronize{
      # Prune messages that timed out
      if @timeout
        now = Time.now
        @pending.delete_if { |_, start_time| (now - start_time) > @timeout }
      end
      pending_ids = @pending.keys
      fail 'Timed out' unless pending_ids.include? message_id

      # Prune invalid responses
      @responses.keep_if { |response| @pending.has_key? response.first }
      res = @responses.find { |response| message.msg_id == response.first }
      if !res.nil?
        @responses.delete(res)
      else
        @response_cv.wait @response_lock, @wait_interval
      end
    }
  end
  return res
end