class RJR::Nodes::Unix
Unix
node definition, listen for and invoke json-rpc requests via Unix
Sockets
Clients should specify the socketname when listening for requests and when invoking them.
TODO client / server examples
Constants
- INDIRECT_NODE
- PERSISTENT_NODE
- RJR_NODE_TYPE
Attributes
Public Class Methods
Unix
initializer @param [Hash] args the options to create the unix node with @option args [String] :socketname the name of the socket which to listen on
RJR::Node::new
# File lib/rjr/nodes/unix.rb, line 97 def initialize(args = {}) super(args) @socketname = args[:socketname] @connections = [] @connections_lock = Mutex.new end
Public Instance Methods
Instructs node to send rpc request, and wait for / return response.
Implementation of RJR::Node#invoke
Do not invoke directly from em event loop or callback as will block the message subscription used to receive responses
@param [String] socketname name of socket which destination node is
listening on
@param [String] rpc_method json-rpc method to invoke on destination @param [Array] args array of arguments to convert to json and invoke remote method wtih
# File lib/rjr/nodes/unix.rb, line 137 def invoke(socketname, rpc_method, *args) message = Messages::Request.new :method => rpc_method, :args => args, :headers => @message_headers connection = nil @@em.schedule { init_client(:socketname => socketname, :rjr_node => self) { |c| connection = c c.send_msg message.to_s } } # TODO optional timeout for response ? result = wait_for_result(message) if result.size > 2 fail result[2] end return result[1] end
Instruct Node
to start listening for and dispatching rpc requests
Implementation of RJR::Node#listen
# File lib/rjr/nodes/unix.rb, line 119 def listen @@em.schedule { @@em.start_unix_domain_server @socketname, nil, UnixConnection, { :rjr_node => self } } self end
Instructs node to send rpc notification (immadiately returns / no response is generated)
Implementation of RJR::Node#notify
@param [String] socketname name of socket which destination node is listening on @param [String] rpc_method json-rpc method to invoke on destination @param [Array] args array of arguments to convert to json and invoke remote method wtih
# File lib/rjr/nodes/unix.rb, line 167 def notify(socketname, rpc_method, *args) # will block until message is published published_l = Mutex.new published_c = ConditionVariable.new invoked = false conn = nil message = Messages::Notification.new :method => rpc_method, :args => args, :headers => @message_headers @@em.schedule { init_client(:socketname => socketname, :rjr_node => self) { |c| conn = c c.send_msg message.to_s # XXX, this should be invoked only when we are sure event # machine sent message. Shouldn't pose a problem unless event # machine is killed immediately after published_l.synchronize { invoked = true ; published_c.signal } } } published_l.synchronize { published_c.wait published_l unless invoked } #sleep 0.01 until conn.get_outbound_data_size == 0 nil end
Send data using specified connection
Implementation of RJR::Node#send_msg
# File lib/rjr/nodes/unix.rb, line 112 def send_msg(data, connection) connection.send_msg(data) end
# File lib/rjr/nodes/unix.rb, line 105 def to_s "RJR::Nodes::Unix<#{@node_id},#{@socketname}>" end
Private Instance Methods
Internal helper, initialize new client
# File lib/rjr/nodes/unix.rb, line 75 def init_client(args={}, &on_init) socketname = args[:socketname] connection = nil @connections_lock.synchronize { connection = @connections.find { |c| socketname == c.socketname } if connection.nil? connection = EventMachine::connect_unix_domain socketname, nil, UnixConnection, args @connections << connection end } on_init.call(connection) # TODO move to unixnode event ? end