class RJR::Nodes::TCP
TCP
node definition, listen for and invoke json-rpc requests via TCP
sockets
Clients should specify the hostname / port when listening for requests and when invoking them.
@example Listening for json-rpc requests over tcp
# initialize node server = RJR::Nodes::TCP.new :node_id => 'server', :host => 'localhost', :port => '7777' # register rjr dispatchers (see RJR::Dispatcher) server.dispatcher.handle('hello') { |name| "Hello #{name}!" } # listen and block server.listen server.join
@example Invoking json-rpc requests over tcp
client = RJR::Nodes::TCP.new :node_id => 'client', :host => 'localhost', :port => '8888' puts client.invoke('jsonrpc://localhost:7777', 'hello', 'mo')
Constants
- INDIRECT_NODE
- PERSISTENT_NODE
- RJR_NODE_TYPE
Attributes
Public Class Methods
TCP
initializer @param [Hash] args the options to create the tcp node with @option args [String] :host the hostname/ip which to listen on @option args [Integer] :port the port which to listen on
RJR::Node::new
# File lib/rjr/nodes/tcp.rb, line 112 def initialize(args = {}) super(args) @host = args[:host] @port = args[:port] @connections = [] @connections_lock = Mutex.new end
Public Instance Methods
Called by TCPConnection::initialize
# File lib/rjr/nodes/tcp.rb, line 143 def add_connection(connection) @connections_lock.synchronize do connections << connection end end
Instructs node to send rpc request, and wait for / return response.
Implementation of RJR::Node#invoke
Do not invoke directly from em event loop or callback as will block the message subscription used to receive responses
@param [String] uri location of node to send request to, should be
in format of jsonrpc://hostname:port or tcp://hostname:port
@param [String] rpc_method json-rpc method to invoke on destination @param [Array] args array of arguments to convert to json and invoke remote method wtih
# File lib/rjr/nodes/tcp.rb, line 167 def invoke(uri, rpc_method, *args) uri = URI.parse(uri) host,port = uri.host, uri.port message = Messages::Request.new :method => rpc_method, :args => args, :headers => @message_headers connection = nil @@em.schedule { init_client(:host => host, :port => port, :rjr_node => self) { |c| connection = c c.send_msg message.to_s } } # TODO optional timeout for response ? result = wait_for_result(message) if result.size > 2 fail result[2] end return result[1] end
Instruct Node
to start listening for and dispatching rpc requests
Implementation of RJR::Node#listen
# File lib/rjr/nodes/tcp.rb, line 135 def listen @@em.schedule { @@em.start_server @host, @port, TCPConnection, { :rjr_node => self } } self end
Instructs node to send rpc notification (immadiately returns / no response is generated)
Implementation of RJR::Node#notify
@param [String] uri location of node to send notification to, should be
in format of jsonrpc://hostname:port
@param [String] rpc_method json-rpc method to invoke on destination @param [Array] args array of arguments to convert to json and invoke remote method wtih
# File lib/rjr/nodes/tcp.rb, line 200 def notify(uri, rpc_method, *args) # will block until message is published published_l = Mutex.new published_c = ConditionVariable.new uri = URI.parse(uri) host,port = uri.host, uri.port invoked = false conn = nil message = Messages::Notification.new :method => rpc_method, :args => args, :headers => @message_headers @@em.schedule { init_client(:host => host, :port => port, :rjr_node => self) { |c| conn = c c.send_msg message.to_s # XXX, this should be invoked only when we are sure event # machine sent message. Shouldn't pose a problem unless event # machine is killed immediately after published_l.synchronize { invoked = true ; published_c.signal } } } published_l.synchronize { published_c.wait published_l unless invoked } #sleep 0.01 until conn.get_outbound_data_size == 0 nil end
Called by TCPConnection::unbind
# File lib/rjr/nodes/tcp.rb, line 150 def remove_connection(connection) @connections_lock.synchronize do connections.delete(connection) end end
Send data using specified connection
Implementation of RJR::Node#send_msg
# File lib/rjr/nodes/tcp.rb, line 128 def send_msg(data, connection) connection.send_msg(data) end
# File lib/rjr/nodes/tcp.rb, line 121 def to_s "RJR::Nodes::TCP<#{@node_id},#{@host},#{@port}>" end
Private Instance Methods
Internal helper, initialize new client
# File lib/rjr/nodes/tcp.rb, line 99 def init_client(args={}, &on_init) host,port = args[:host], args[:port] connection = @connections.find { |c| port == c.port && host == c.host } connection ||= EventMachine::connect(host, port, TCPConnection, args) on_init.call(connection) # TODO move to tcpnode event ? end