class RubySkynet::Connection
Attributes
socket[R]
Returns the underlying socket being used by a Connection
instance
Public Class Methods
new(servers, params = {})
click to toggle source
Returns a new RubySkynet
connection to the server
Parameters:
:read_timeout [Float] Time in seconds to timeout on read Can be overridden by supplying a timeout in the read call Default: 60 :connect_timeout [Float] Time in seconds to timeout when trying to connect to the server Default: Half of the :read_timeout ( 30 seconds ) :connect_retry_count [Fixnum] Number of times to retry connecting when a connection fails Default: 10 :connect_retry_interval [Float] Number of seconds between connection retry attempts after the first failed attempt Default: 0.5
# File lib/ruby_skynet/connection.rb, line 38 def initialize(servers, params = {}) params = params.dup # User configurable options params[:read_timeout] ||= 60 params[:connect_timeout] ||= 30 params[:connect_retry_interval] ||= 0.1 params[:connect_retry_count] ||= 5 # Disable send buffering since it is a RPC call params[:buffered] = false # For each new connection perform the Skynet handshake params[:on_connect] = Proc.new do |socket| # Reset user_data on each connection socket.user_data = { :seq => 0, :logger => logger } # Receive Service Handshake # Registered bool # Registered indicates the state of this service. If it is false, the connection will # close immediately and the client should look elsewhere for this service. # # ClientID string # ClientID is a UUID that is used by the client to identify itself in RPC requests. logger.debug "Waiting for Service Handshake" service_handshake = Common.read_bson_document(socket) logger.trace 'Service Handshake', service_handshake # #TODO When a reconnect returns registered == false need to throw an exception # so that this host connection is not used registered = service_handshake['registered'] client_id = service_handshake['clientid'] socket.user_data[:client_id] = client_id # Send blank ClientHandshake client_handshake = { 'clientid' => client_id } logger.debug "Sending Client Handshake" logger.trace 'Client Handshake', client_handshake socket.write(client_handshake.to_bson) end # To prevent strange issues if user incorrectly supplies server names params.delete(:server) params[:servers] = servers @socket = ResilientSocket::TCPClient.new(params) end
with_connection(server, params={}, &block)
click to toggle source
Execute the supplied block with a connection from the pool
# File lib/ruby_skynet/connection.rb, line 196 def self.with_connection(server, params={}, &block) conn = nil begin conn = new(server, params) block.call(conn) ensure conn.close if conn end end
Public Instance Methods
close()
click to toggle source
# File lib/ruby_skynet/connection.rb, line 206 def close @socket.close if @socket end
rpc_call(request_id, skynet_name, method_name, parameters, idempotent=false)
click to toggle source
Performs a synchronous call to a Skynet server
Parameters:
skynet_name [String|Symbol]: Name of the method to pass in the request method_name [String|Symbol]: Name of the method to pass in the request parameters [Hash]: Parameters to pass in the request idempotent [True|False]: If the request can be applied again to the server without changing its state Set to true to retry the entire request after the send is successful
Returns the Hash result returned from the Skynet Service
Raises RubySkynet::ProtocolError
Raises RubySkynet::SkynetException
# File lib/ruby_skynet/connection.rb, line 106 def rpc_call(request_id, skynet_name, method_name, parameters, idempotent=false) logger.benchmark_info "Called #{skynet_name}.#{method_name}" do retry_count = 0 header = nil response = nil socket.retry_on_connection_failure do |socket| header = { 'servicemethod' => "#{skynet_name}.Forward", 'seq' => socket.user_data[:seq] } logger.debug "Sending Header" logger.trace 'Header', header socket.write(header.to_bson) # The parameters are placed in the request object in BSON serialized form request = { 'clientid' => socket.user_data[:client_id], 'in' => BSON::Binary.new(parameters.to_bson), 'method' => method_name.to_s, 'requestinfo' => { 'requestid' => request_id, # Increment retry count to indicate that the request may have been tried previously 'retrycount' => retry_count, # TODO: this should be forwarded along in case of services also # being a client and calling additional services. If empty it will # be stuffed with connecting address 'originaddress' => '' } } logger.debug "Sending Request" logger.trace 'Request', request logger.trace 'Parameters:', parameters socket.write(request.to_bson) # Since Send does not affect state on the server we can also retry reads if idempotent logger.debug "Reading header from server" header = Common.read_bson_document(socket) logger.debug 'Response Header', header # Read the BSON response document logger.debug "Reading response from server" response = Common.read_bson_document(socket) logger.trace 'Response', response end end # Perform the read outside the retry block since a successful write # means that the servers state may have been changed unless idempotent # Read header first as a separate BSON document logger.debug "Reading header from server" header = Common.read_bson_document(socket) logger.debug 'Response Header', header # Read the BSON response document logger.debug "Reading response from server" response = Common.read_bson_document(socket) logger.trace 'Response', response end # Ensure the sequence number in the response header matches the # sequence number sent in the request seq_no = header['seq'] if seq_no != socket.user_data[:seq] raise ProtocolError.new("Incorrect Response received, expected seq=#{socket.user_data[:seq]}, received: #{header.inspect}") end # Increment Sequence number only on successful response socket.user_data[:seq] += 1 # If an error is returned from Skynet raise a Skynet exception error = header['error'] raise SkynetException.new(error) if error.to_s.length > 0 # If an error is returned from the service raise a Service exception error = response['error'] raise ServiceException.new(error) if error.to_s.length > 0 # Return Value # The return value is inside the response object, it's a byte array of it's own and needs to be deserialized result = Hash.from_bson(StringIO.new(response['out'].data)) logger.trace 'Return Value', result result end end