class RJR::Nodes::TCP

TCP node definition, listen for and invoke json-rpc requests via TCP sockets

Clients should specify the hostname / port when listening for requests and when invoking them.

@example Listening for json-rpc requests over tcp

# initialize node
server = RJR::Nodes::TCP.new :node_id => 'server', :host => 'localhost', :port => '7777'

# register rjr dispatchers (see RJR::Dispatcher)
server.dispatcher.handle('hello') { |name|
  "Hello #{name}!"
}

# listen and block
server.listen
server.join

@example Invoking json-rpc requests over tcp

client = RJR::Nodes::TCP.new :node_id => 'client', :host => 'localhost', :port => '8888'
puts client.invoke('jsonrpc://localhost:7777', 'hello', 'mo')

Constants

INDIRECT_NODE
PERSISTENT_NODE
RJR_NODE_TYPE

Attributes

connections[RW]

Public Class Methods

new(args = {}) click to toggle source

TCP initializer @param [Hash] args the options to create the tcp node with @option args [String] :host the hostname/ip which to listen on @option args [Integer] :port the port which to listen on

Calls superclass method RJR::Node::new
# File lib/rjr/nodes/tcp.rb, line 112
def initialize(args = {})
   super(args)
   @host      = args[:host]
   @port      = args[:port]

   @connections = []
   @connections_lock = Mutex.new
end

Public Instance Methods

add_connection(connection) click to toggle source

Called by TCPConnection::initialize

# File lib/rjr/nodes/tcp.rb, line 143
def add_connection(connection)
  @connections_lock.synchronize do
    connections << connection
  end
end
invoke(uri, rpc_method, *args) click to toggle source

Instructs node to send rpc request, and wait for / return response.

Implementation of RJR::Node#invoke

Do not invoke directly from em event loop or callback as will block the message subscription used to receive responses

@param [String] uri location of node to send request to, should be

in format of jsonrpc://hostname:port or tcp://hostname:port

@param [String] rpc_method json-rpc method to invoke on destination @param [Array] args array of arguments to convert to json and invoke remote method wtih

# File lib/rjr/nodes/tcp.rb, line 167
def invoke(uri, rpc_method, *args)
  uri = URI.parse(uri)
  host,port = uri.host, uri.port

  message = Messages::Request.new :method => rpc_method,
                                  :args   => args,
                                  :headers => @message_headers
  connection = nil
  @@em.schedule {
    init_client(:host => host, :port => port,
                :rjr_node => self) { |c|
      connection = c
      c.send_msg message.to_s
    }
  }

  # TODO optional timeout for response ?
  result = wait_for_result(message)

  if result.size > 2
    fail result[2]
  end
  return result[1]
end
listen() click to toggle source

Instruct Node to start listening for and dispatching rpc requests

Implementation of RJR::Node#listen

# File lib/rjr/nodes/tcp.rb, line 135
def listen
  @@em.schedule {
    @@em.start_server @host, @port, TCPConnection, { :rjr_node => self }
  }
  self
end
notify(uri, rpc_method, *args) click to toggle source

Instructs node to send rpc notification (immadiately returns / no response is generated)

Implementation of RJR::Node#notify

@param [String] uri location of node to send notification to, should be

in format of jsonrpc://hostname:port

@param [String] rpc_method json-rpc method to invoke on destination @param [Array] args array of arguments to convert to json and invoke remote method wtih

# File lib/rjr/nodes/tcp.rb, line 200
def notify(uri, rpc_method, *args)
  # will block until message is published
  published_l = Mutex.new
  published_c = ConditionVariable.new

  uri = URI.parse(uri)
  host,port = uri.host, uri.port

  invoked = false
  conn    = nil
  message = Messages::Notification.new :method => rpc_method,
                                       :args   => args,
                                       :headers => @message_headers
  @@em.schedule {
    init_client(:host => host, :port => port,
                :rjr_node => self) { |c|
      conn = c
      c.send_msg message.to_s
      # XXX, this should be invoked only when we are sure event
      # machine sent message. Shouldn't pose a problem unless event
      # machine is killed immediately after
      published_l.synchronize { invoked = true ; published_c.signal }
    }
  }
  published_l.synchronize { published_c.wait published_l unless invoked }
  #sleep 0.01 until conn.get_outbound_data_size == 0
  nil
end
remove_connection(connection) click to toggle source

Called by TCPConnection::unbind

# File lib/rjr/nodes/tcp.rb, line 150
def remove_connection(connection)
  @connections_lock.synchronize do
    connections.delete(connection)
  end
end
send_msg(data, connection) click to toggle source

Send data using specified connection

Implementation of RJR::Node#send_msg

# File lib/rjr/nodes/tcp.rb, line 128
def send_msg(data, connection)
  connection.send_msg(data)
end
to_s() click to toggle source
# File lib/rjr/nodes/tcp.rb, line 121
def to_s
  "RJR::Nodes::TCP<#{@node_id},#{@host},#{@port}>"
end

Private Instance Methods

init_client(args={}, &on_init) click to toggle source

Internal helper, initialize new client

# File lib/rjr/nodes/tcp.rb, line 99
def init_client(args={}, &on_init)
  host,port = args[:host], args[:port]
  connection = @connections.find { |c| port == c.port && host == c.host }
  connection ||= EventMachine::connect(host, port, TCPConnection, args)
  on_init.call(connection) # TODO move to tcpnode event ?
end