class RJR::Nodes::Unix

Unix node definition, listen for and invoke json-rpc requests via Unix Sockets

Clients should specify the socketname when listening for requests and when invoking them.

TODO client / server examples

Constants

INDIRECT_NODE
PERSISTENT_NODE
RJR_NODE_TYPE

Attributes

connections[RW]

Public Class Methods

new(args = {}) click to toggle source

Unix initializer @param [Hash] args the options to create the unix node with @option args [String] :socketname the name of the socket which to listen on

Calls superclass method RJR::Node::new
# File lib/rjr/nodes/unix.rb, line 97
def initialize(args = {})
   super(args)
   @socketname = args[:socketname]

   @connections = []
   @connections_lock = Mutex.new
end

Public Instance Methods

invoke(socketname, rpc_method, *args) click to toggle source

Instructs node to send rpc request, and wait for / return response.

Implementation of RJR::Node#invoke

Do not invoke directly from em event loop or callback as will block the message subscription used to receive responses

@param [String] socketname name of socket which destination node is

listening on

@param [String] rpc_method json-rpc method to invoke on destination @param [Array] args array of arguments to convert to json and invoke remote method wtih

# File lib/rjr/nodes/unix.rb, line 137
def invoke(socketname, rpc_method, *args)
  message = Messages::Request.new :method => rpc_method,
                                  :args   => args,
                                  :headers => @message_headers
  connection = nil
  @@em.schedule {
    init_client(:socketname => socketname,
                :rjr_node => self) { |c|
      connection = c
      c.send_msg message.to_s
    }
  }

  # TODO optional timeout for response ?
  result = wait_for_result(message)

  if result.size > 2
    fail result[2]
  end
  return result[1]
end
listen() click to toggle source

Instruct Node to start listening for and dispatching rpc requests

Implementation of RJR::Node#listen

# File lib/rjr/nodes/unix.rb, line 119
def listen
  @@em.schedule {
    @@em.start_unix_domain_server @socketname, nil, UnixConnection, { :rjr_node => self }
  }
  self
end
notify(socketname, rpc_method, *args) click to toggle source

Instructs node to send rpc notification (immadiately returns / no response is generated)

Implementation of RJR::Node#notify

@param [String] socketname name of socket which destination node is listening on @param [String] rpc_method json-rpc method to invoke on destination @param [Array] args array of arguments to convert to json and invoke remote method wtih

# File lib/rjr/nodes/unix.rb, line 167
def notify(socketname, rpc_method, *args)
  # will block until message is published
  published_l = Mutex.new
  published_c = ConditionVariable.new

  invoked = false
  conn    = nil
  message = Messages::Notification.new :method => rpc_method,
                                       :args   => args,
                                       :headers => @message_headers
  @@em.schedule {
    init_client(:socketname => socketname,
                :rjr_node => self) { |c|
      conn = c
      c.send_msg message.to_s
      # XXX, this should be invoked only when we are sure event
      # machine sent message. Shouldn't pose a problem unless event
      # machine is killed immediately after
      published_l.synchronize { invoked = true ; published_c.signal }
    }
  }
  published_l.synchronize { published_c.wait published_l unless invoked }
  #sleep 0.01 until conn.get_outbound_data_size == 0
  nil
end
send_msg(data, connection) click to toggle source

Send data using specified connection

Implementation of RJR::Node#send_msg

# File lib/rjr/nodes/unix.rb, line 112
def send_msg(data, connection)
  connection.send_msg(data)
end
to_s() click to toggle source
# File lib/rjr/nodes/unix.rb, line 105
def to_s
  "RJR::Nodes::Unix<#{@node_id},#{@socketname}>"
end

Private Instance Methods

init_client(args={}, &on_init) click to toggle source

Internal helper, initialize new client

# File lib/rjr/nodes/unix.rb, line 75
def init_client(args={}, &on_init)
  socketname = args[:socketname]
  connection = nil
  @connections_lock.synchronize {
    connection = @connections.find { |c|
                   socketname == c.socketname
                 }
    if connection.nil?
      connection =
        EventMachine::connect_unix_domain socketname,
                    nil, UnixConnection, args
      @connections << connection
    end
  }
  on_init.call(connection) # TODO move to unixnode event ?
end