class RubySkynet::Connection

Attributes

socket[R]

Returns the underlying socket being used by a Connection instance

Public Class Methods

new(servers, params = {}) click to toggle source

Returns a new RubySkynet connection to the server

Parameters:

:read_timeout [Float]
  Time in seconds to timeout on read
  Can be overridden by supplying a timeout in the read call
  Default: 60

:connect_timeout [Float]
  Time in seconds to timeout when trying to connect to the server
  Default: Half of the :read_timeout ( 30 seconds )

:connect_retry_count [Fixnum]
  Number of times to retry connecting when a connection fails
  Default: 10

:connect_retry_interval [Float]
  Number of seconds between connection retry attempts after the first failed attempt
  Default: 0.5
# File lib/ruby_skynet/connection.rb, line 38
def initialize(servers, params = {})
  params = params.dup

  # User configurable options
  params[:read_timeout]           ||= 60
  params[:connect_timeout]        ||= 30
  params[:connect_retry_interval] ||= 0.1
  params[:connect_retry_count]    ||= 5

  # Disable send buffering since it is a RPC call
  params[:buffered] = false

  # For each new connection perform the Skynet handshake
  params[:on_connect] = Proc.new do |socket|
    # Reset user_data on each connection
    socket.user_data = {
      :seq    => 0,
      :logger => logger
    }

    # Receive Service Handshake
    # Registered bool
    #   Registered indicates the state of this service. If it is false, the connection will
    #   close immediately and the client should look elsewhere for this service.
    #
    # ClientID string
    #   ClientID is a UUID that is used by the client to identify itself in RPC requests.
    logger.debug "Waiting for Service Handshake"
    service_handshake = Common.read_bson_document(socket)
    logger.trace 'Service Handshake', service_handshake

    # #TODO When a reconnect returns registered == false need to throw an exception
    # so that this host connection is not used
    registered = service_handshake['registered']
    client_id = service_handshake['clientid']
    socket.user_data[:client_id] = client_id

    # Send blank ClientHandshake
    client_handshake = { 'clientid' => client_id }
    logger.debug "Sending Client Handshake"
    logger.trace 'Client Handshake', client_handshake
    socket.write(client_handshake.to_bson)
  end

  # To prevent strange issues if user incorrectly supplies server names
  params.delete(:server)
  params[:servers] = servers

  @socket = ResilientSocket::TCPClient.new(params)
end
with_connection(server, params={}, &block) click to toggle source

Execute the supplied block with a connection from the pool

# File lib/ruby_skynet/connection.rb, line 196
def self.with_connection(server, params={}, &block)
  conn = nil
  begin
    conn = new(server, params)
    block.call(conn)
  ensure
    conn.close if conn
  end
end

Public Instance Methods

close() click to toggle source
# File lib/ruby_skynet/connection.rb, line 206
def close
  @socket.close if @socket
end
rpc_call(request_id, skynet_name, method_name, parameters, idempotent=false) click to toggle source

Performs a synchronous call to a Skynet server

Parameters:

skynet_name [String|Symbol]:
  Name of the method to pass in the request
method_name [String|Symbol]:
  Name of the method to pass in the request
parameters [Hash]:
  Parameters to pass in the request
idempotent [True|False]:
  If the request can be applied again to the server without changing its state
  Set to true to retry the entire request after the send is successful

Returns the Hash result returned from the Skynet Service

Raises RubySkynet::ProtocolError Raises RubySkynet::SkynetException

# File lib/ruby_skynet/connection.rb, line 106
def rpc_call(request_id, skynet_name, method_name, parameters, idempotent=false)
  logger.benchmark_info "Called #{skynet_name}.#{method_name}" do
    retry_count = 0
    header = nil
    response = nil
    socket.retry_on_connection_failure do |socket|
      header = {
        'servicemethod' => "#{skynet_name}.Forward",
        'seq'           => socket.user_data[:seq]
      }

      logger.debug "Sending Header"
      logger.trace 'Header', header
      socket.write(header.to_bson)

      # The parameters are placed in the request object in BSON serialized form
      request = {
        'clientid'    => socket.user_data[:client_id],
        'in'          => BSON::Binary.new(parameters.to_bson),
        'method'      => method_name.to_s,
        'requestinfo' => {
          'requestid'     => request_id,
          # Increment retry count to indicate that the request may have been tried previously
          'retrycount' => retry_count,
          # TODO: this should be forwarded along in case of services also
          # being a client and calling additional services. If empty it will
          # be stuffed with connecting address
          'originaddress' => ''
        }
      }

      logger.debug "Sending Request"
      logger.trace 'Request', request
      logger.trace 'Parameters:', parameters
      socket.write(request.to_bson)

      # Since Send does not affect state on the server we can also retry reads
      if idempotent
        logger.debug "Reading header from server"
        header = Common.read_bson_document(socket)
        logger.debug 'Response Header', header

        # Read the BSON response document
        logger.debug "Reading response from server"
        response = Common.read_bson_document(socket)
        logger.trace 'Response', response
      end
    end

    # Perform the read outside the retry block since a successful write
    # means that the servers state may have been changed
    unless idempotent
      # Read header first as a separate BSON document
      logger.debug "Reading header from server"
      header = Common.read_bson_document(socket)
      logger.debug 'Response Header', header

      # Read the BSON response document
      logger.debug "Reading response from server"
      response = Common.read_bson_document(socket)
      logger.trace 'Response', response
    end

    # Ensure the sequence number in the response header matches the
    # sequence number sent in the request
    seq_no = header['seq']
    if seq_no != socket.user_data[:seq]
      raise ProtocolError.new("Incorrect Response received, expected seq=#{socket.user_data[:seq]}, received: #{header.inspect}")
    end

    # Increment Sequence number only on successful response
    socket.user_data[:seq] += 1

    # If an error is returned from Skynet raise a Skynet exception
    error = header['error']
    raise SkynetException.new(error) if error.to_s.length > 0

    # If an error is returned from the service raise a Service exception
    error = response['error']
    raise ServiceException.new(error) if error.to_s.length > 0

    # Return Value
    # The return value is inside the response object, it's a byte array of it's own and needs to be deserialized
    result = Hash.from_bson(StringIO.new(response['out'].data))
    logger.trace 'Return Value', result
    result
  end
end