class RJR::Node
Base RJR
Node
interface. Nodes
are the central transport mechanism of RJR
, this class provides the core methods common among all transport types and mechanisms to start and run the subsystems which drives all requests.
A subclass of RJR::Node
should be defined for each transport that is supported. Each subclass should define
* RJR_NODE_TYPE - unique id of the transport * listen method - begin listening for new requests and return * send_message(msg, connection) - send message using the specified connection (transport dependent) * invoke - establish connection, send message, and wait for / return result * notify - establish connection, send message, and immediately return
Not all methods necessarily have to be implemented depending on the context / use of the node, and the base node class provides many utility methods which to assist in message processing (see below).
See nodes residing in lib/rjr/nodes/ for specific examples.
Attributes
Handlers for various connection events
Dispatcher
to use to satisfy requests
Attitional header fields to set on all requests and responses received and sent by node
Unique string identifier of the node
Public Class Methods
# File lib/rjr/node.rb, line 80 def self.em defined?(@@em) ? @@em : nil end
Bool indiciting if this node is indirect
# File lib/rjr/node.rb, line 58 def indirect? self.const_defined?(:INDIRECT_NODE) && self.const_get(:INDIRECT_NODE) end
RJR::Node
initializer
@param [Hash] args options to set on request @option args [String] :node_id unique id of the node @option args [Hash<String,String>] :headers optional headers to set
on all json-rpc messages
@option args [Dispatcher] :dispatcher dispatcher to assign to the node
# File lib/rjr/node.rb, line 103 def initialize(args = {}) clear_event_handlers @response_lock = Mutex.new @response_cv = ConditionVariable.new @pending = {} @responses = [] @node_id = args[:node_id] @timeout = args[:timeout] @wait_interval = args[:wait_interval] || 0.01 @dispatcher = args[:dispatcher] || RJR::Dispatcher.new @message_headers = args.has_key?(:headers) ? {}.merge(args[:headers]) : {} @@tp ||= ThreadPool.new @@em ||= EMAdapter.new # will do nothing if already started tp.start em.start end
Bool indiciting if this node is persistent
# File lib/rjr/node.rb, line 52 def persistent? self.const_defined?(:PERSISTENT_NODE) && self.const_get(:PERSISTENT_NODE) end
# File lib/rjr/node.rb, line 88 def self.tp defined?(@@tp) ? @@tp : nil end
Public Instance Methods
Reset connection event handlers
# File lib/rjr/node.rb, line 148 def clear_event_handlers @connection_event_handlers = { :opened => [], :closed => [], :error => [] } end
# File lib/rjr/node.rb, line 84 def em self.class.em end
Immediately terminate the node
Warning this does what it says it does. All running threads, and reactor jobs are immediately killed
@return self
# File lib/rjr/node.rb, line 140 def halt em.stop_event_loop tp.stop self end
Bool indicating if this node class is indirect
# File lib/rjr/node.rb, line 70 def indirect? self.class.indirect? end
Block until the eventmachine reactor and thread pool have both completed running.
@return self
# File lib/rjr/node.rb, line 128 def join tp.join em.join self end
alias of RJR_NODE_TYPE
# File lib/rjr/node.rb, line 75 def node_type self.class.const_defined?(:RJR_NODE_TYPE) ? self.class.const_get(:RJR_NODE_TYPE) : nil end
Register connection event handler @param event [:opened, :closed, :error] the event to register the handler
for
@param handler [Callable] block param to be added to array of handlers
that are called when event occurs
@yield [Node, *args] self and event-specific *args are passed to each
registered handler when event occurs
# File lib/rjr/node.rb, line 163 def on(event, &handler) return unless @connection_event_handlers.keys.include?(event) @connection_event_handlers[event] << handler end
Bool indicating if this node class is persistent
# File lib/rjr/node.rb, line 65 def persistent? self.class.persistent? end
# File lib/rjr/node.rb, line 92 def tp self.class.tp end
Private Instance Methods
Internal helper, extract client info from connection
# File lib/rjr/node.rb, line 180 def client_for(connection) # skip if an indirect node type or local return nil, nil if self.indirect? || self.node_type == :local begin return Socket.unpack_sockaddr_in(connection.get_peername) rescue Exception=>e end return nil, nil end
Internal helper, run connection event handlers for specified event, passing self and args to handler
# File lib/rjr/node.rb, line 172 def connection_event(event, *args) return unless @connection_event_handlers.keys.include?(event) @connection_event_handlers[event].each { |h| h.call(self, *args) } end
Internal helper, handle message received
# File lib/rjr/node.rb, line 193 def handle_message(msg, connection = {}) intermediate = Messages::Intermediate.parse(msg) if Messages::Request.is_request_message?(intermediate) tp << ThreadPoolJob.new(intermediate) { |i| handle_request(i, false, connection) } elsif Messages::Notification.is_notification_message?(intermediate) tp << ThreadPoolJob.new(intermediate) { |i| handle_request(i, true, connection) } elsif Messages::Response.is_response_message?(intermediate) handle_response(intermediate) end intermediate end
Internal helper, handle request message received
# File lib/rjr/node.rb, line 215 def handle_request(message, notification=false, connection={}) # get client for the specified connection # TODO should grap port/ip immediately on connection and use that client_port,client_ip = client_for(connection) msg = notification ? Messages::Notification.new(:message => message, :headers => @message_headers) : Messages::Request.new(:message => message, :headers => @message_headers) callback = NodeCallback.new(:node => self, :connection => connection) result = @dispatcher.dispatch(:rjr_method => msg.jr_method, :rjr_method_args => msg.jr_args, :rjr_headers => msg.headers, :rjr_client_ip => client_ip, :rjr_client_port => client_port, :rjr_node => self, :rjr_node_id => node_id, :rjr_node_type => self.node_type, :rjr_callback => callback) unless notification response = Messages::Response.new(:id => msg.msg_id, :result => result, :headers => msg.headers, :request => msg) self.send_msg(response.to_s, connection) return response end nil end
Internal helper, handle response message received
# File lib/rjr/node.rb, line 252 def handle_response(message) msg = Messages::Response.new(:message => message, :headers => self.message_headers) res = err = nil begin res = @dispatcher.handle_response(msg.result) rescue Exception => e err = e end @response_lock.synchronize { result = [msg.msg_id, res] result << err if !err.nil? @responses << result @response_cv.broadcast } end
Internal helper, block until response matching message id is received
# File lib/rjr/node.rb, line 271 def wait_for_result(message) res = nil message_id = message.msg_id @pending[message_id] = Time.now while res.nil? @response_lock.synchronize{ # Prune messages that timed out if @timeout now = Time.now @pending.delete_if { |_, start_time| (now - start_time) > @timeout } end pending_ids = @pending.keys fail 'Timed out' unless pending_ids.include? message_id # Prune invalid responses @responses.keep_if { |response| @pending.has_key? response.first } res = @responses.find { |response| message.msg_id == response.first } if !res.nil? @responses.delete(res) else @response_cv.wait @response_lock, @wait_interval end } end return res end