class SimpleRPC::Client

The SimpleRPC client connects to a server, either persistently on on-demand, and makes calls to its proxy object.

Once created, you should be able to interact with the client as if it were the remote object, i.e.:

require 'simplerpc/client'

# Connect
c = SimpleRPC::Client.new(:hostname => '127.0.0.1', :port => 27045)

# Make some calls directly
c.length        # 2
c.call(:dup)    # ["thing", "thing2"]
c.call(:class)  # Array

# Get a proxy object
p = c.get_proxy
c.persist     # always-on mode with 1 connection
p.dup         # ["thing", "thing2"]
p.length      # 2
p.class       # Array
p.each{|x| puts x} # outputs "thing\nthing2\n"

# Disconnect from always-on mode
c.disconnect

Making Requests

Requests can be made on the client object as if it were local, and these will be proxied to the server. For methods that are clobbered locally (for example ‘.class’, which will return ‘SimpleRPC::Client’, you may use call to send this without local interaction:

c.class         # SimpleRPC::Client
c.call(:class)  # Array

Proxy Objects

Calling get_proxy will return a dynamically-constructed object that lacks any methods other than remote ones—this means it will be almost indistinguishable from a local object:

c.class        # Array
c.dup          # ['thing', 'thing2']

This is an exceptionally seamless way of interacting, but you must retain the original client connection in order to call Client#disconnect or use always-on mode.

Blocks

Blocks are supported and run on the client-side. A server object may yield any number of times. Note that if the client is single-threaded, it is not possible to call further calls when inside the block (if :threading is on this is perfectly acceptable).

Exceptions

Remote exceptions fired by the server during a call are returned as RemoteExceptions, and have the message and backtrace set as if you are on the remote server.

Network errors are exposed directly. The server will not close a pipe during an operation, so if using connect-on-demand you should only observe Errno::ECONNREFUSED exceptions. If using a persistent connection pool, you will encounter either Errno::ECONNREFUSED, Errno::ECONNRESET or EOFError as the serialiser attempts to read from the closed socket.

Thread Safety

Clients are thread-safe and will block when controlling the always-on connection with persist and close.

If :threaded is true, clients will support multiple connections to the server. If used in always-on mode, this means it will maintain one re-usable connection, and only spawn new ones if requested.

Modes

It is possible to use the client in two modes: always-on and connect-on-demand, controlled by calling persist and disconnect.

Always-on mode maintains a pool of connections to the server, and requests are preferentially sent over these (note that if you have threading off, it makes no sense to allocate more than one entry in the pool)

connect-on-demand creates a connection when necessary. This mode is used whenever the client is not connected. There is a small performance hit to reconnecting each time, especially if you are using authentication.

Serialisation Formats

By default both client and server use Marshal. This has proven fast and general, and is capable of sending data directly over sockets.

The serialiser also supports MessagePack (the msgpack gem), and this yields a small performance increase at the expense of generality (restrictions on data type).

Note that JSON and YAML, though they support reading and writing to sockets, do not properly terminate their reads and cause the system to hang. These methods are both slow and limited by comparison anyway, and algorithms needed to support their use require relatively large memory usage. They may be supported in later versions.

Authentication

Setting the :password and :secret options will cause the client to attempt auth on connection. If this process succeeds, the client will then proceed as before, else the server will forcibly close the socket. If :fast_auth is on this will cause some kind of random data loading exception from the serialiser. If :fast_auth is off (default), this will throw a SimpleRPC::AuthenticationError exception.

Clients and servers do not tell one another to use auth (such a system would impact speed) so the results of using mismatched configurations are undefined.

The auth process is simple and not particularly secure, but is designed to deter casual connections and attacks. It uses a password that is sent encrypted against a salt sent by the server to prevent replay attacks. If you want more reliable security, use an SSH tunnel.

The performance impact of auth is small, and takes about the same time as a simple request. This can be mitigated by using always-on mode.

Attributes

fast_auth[RW]
hostname[R]
password[W]
port[R]
secret[W]
serialiser[RW]
threaded[R]
timeout[R]

Public Class Methods

new(opts = {}) click to toggle source

Create a new client for the network. Takes an options hash, in which :port is required:

:hostname

The hostname to connect to. Defaults to localhost

:port

The port to connect on. Required.

:serialiser

A class supporting dump(object, io) and load(IO), defaults to Marshal. I recommend using MessagePack if this is not fast enough

:timeout

Socket timeout in seconds.

:password

The password clients need to connect

:secret

The encryption key used during password authentication. Should be some long random string that matches the server’s. This should be ASCII-8bit encoded (it will be converted if not)

:fast_auth

Use a slightly faster auth system that is incapable of knowing if it has failed or not. By default this is off.

:threaded

Support multiple connections to the server (default is on) If off, threaded requests will queue in the client.

# File lib/simplerpc/client.rb, line 162
def initialize(opts = {})

  # Connection details
  @hostname     = opts[:hostname]   || '127.0.0.1'
  @port         = opts[:port]
  raise 'Port required' unless @port
  timeout       = opts[:timeout]


  # Support multiple connections at once?
  @threaded     = !(opts[:threaded] == false)

  # Serialiser.
  @serialiser   = opts[:serialiser] || Marshal

  # Auth system
  if opts[:password] && opts[:secret]
    require 'simplerpc/encryption'
    @password   = opts[:password]
    @secret     = opts[:secret]

    # Check for return from auth?
    @fast_auth  = (opts[:fast_auth] == true)
  end

  # Threading uses @pool, single thread uses @s and @mutex
  if @threaded
    @pool_mutex           = Mutex.new # Controls edits to the pool
    @pool                 = {}        # List of available sockets with
                                      # accompanying mutices
  else
    @mutex                = Mutex.new
    @s                    = nil
  end
end
new_proxy(opts = {}) click to toggle source

Connect to the remote server and return two things:

  • A proxy object for communicating with the server

  • The client itself, for controlling the connection

All options are the same as new

# File lib/simplerpc/client.rb, line 219
def self.new_proxy(opts = {})
  client = self.new(opts)
  proxy = client.get_proxy

  return proxy, client
end

Public Instance Methods

call(m, *args, &block) click to toggle source

Call a method that is otherwise clobbered by the client object, e.g.:

client.call(:dup) # return a copy of the server object
# File lib/simplerpc/client.rb, line 324
def call(m, *args, &block)
  method_missing(m, *args, &block)
end
connected?() click to toggle source

Is this client maintaining any persistent connections?

Returns true/false if the client is single-threaded, or the number of active connections if the client is multi-threaded

# File lib/simplerpc/client.rb, line 306
def connected?

  # If not threaded, simply check socket
  @mutex.synchronize { return _connected?(@s) } unless @threaded

  # if threaded, return pool length
  @pool_mutex.synchronize { return (@pool.length) }
end
disconnect() click to toggle source

Close all persistent connections to the server.

# File lib/simplerpc/client.rb, line 298
def disconnect
  persist(0)
end
get_proxy() click to toggle source

Returns a proxy object that is all but indistinguishable from the remote object.

This allows you to pass the object around whilst retaining control over the RPC client (i.e. calling persist/disconnect).

The class returned extends BasicObject and is thus able to pass all calls through to the server.

# File lib/simplerpc/client.rb, line 376
def get_proxy

  # Construct a new class as a subclass of RemoteObject
  cls = Class.new(RemoteObject) do

    # Accept the originating client
    def initialize(client)
      @client = client
    end

    # And handle method_missing by calling the client
    def method_missing(m, *args, &block)
      @client.call(m, *args, &block)
    end
  end

  # Return a new class linked to us
  return cls.new(self)
end
method_missing(m, *args) { |*result| ... } click to toggle source

Calls RPC on the remote object.

You should not need to call this directly (though you are welcome to).

# File lib/simplerpc/client.rb, line 332
def method_missing(m, *args, &block)

  # Records the server's return values.
  result      = nil
  success     = true

  # Get a socket preferentially from the pool,
  # and do the actual work
  _get_socket() do |s, persist|

    # send method name and arity
    SocketProtocol::Stream.send(s, [m, args, block_given?, persist], @serialiser)

    # Call with args
    success, result = SocketProtocol::Stream.recv(s, @serialiser)
    
    # Check if we should yield
    while success == SocketProtocol::REQUEST_YIELD do
      block_result = yield(*result)
      SocketProtocol::Stream.send(s, block_result, @serialiser)
      success, result = SocketProtocol::Stream.recv(s, @serialiser)
    end

  end

  # If it didn't succeed, treat the payload as an exception
  raise result unless success == SocketProtocol::REQUEST_SUCCESS
  return result
rescue EOFError, Errno::ECONNRESET, Errno::ETIMEDOUT, 
       Errno::ECONNREFUSED, Errno::ECONNABORTED, Errno::EPIPE => e
  raise ConnectionError.new(e)
rescue StandardError => e
  raise FormatError.new(e)
end
persist(pool_size = 1) click to toggle source

Tell the client how many connections to persist.

If the client is single-threaded, this can either be 1 or 0. If the client is multi-threaded, it can be any positive integer value (or 0).

persist(0) is equivalent to disconnect.

# File lib/simplerpc/client.rb, line 237
def persist(pool_size = 1)

  # Check the pool size is positive
  raise 'Invalid pool size requested' if pool_size < 0

  # If not threaded, check pool size is valid and connect/disconnect
  # single socket
  unless @threaded
    raise 'Threading is disabled: pool size must be 1' if pool_size > 1

    # Set socket up
    @mutex.synchronize do
      if pool_size == 0
        _disconnect(@s)
        @s = nil
      else
        @s  = _connect
      end
    end

    return
  end

  # If threaded, create a pool of sockets instead
  @pool_mutex.synchronize do

    # Resize the pool
    if pool_size > @pool.length

      # Allocate more pool space by simply
      # connecting more sockets
      (pool_size - @pool.length).times { @pool[_connect] = Mutex.new }

    else

      # remove from the pool by trying to remove available
      # sockets over and over until they are gone.
      #
      # This has the effect of waiting for clients to be done
      # with the socket, without hanging on any one mutex.
      while @pool.length > pool_size do

        # Go through and remove from the pool if unused.
        @pool.each do |s, m|
          if @pool.length > pool_size && m.try_lock
            _disconnect(s)
            @pool.delete(s)
          end
        end

        # Since we're spinning, delay for a while
        sleep(0.05)
      end
    end
  end
rescue EOFError, Errno::ECONNRESET, Errno::ETIMEDOUT, 
       Errno::ECONNREFUSED, Errno::ECONNABORTED, Errno::EPIPE => e
  raise ConnectionError.new(e)
end
timeout=(timeout) click to toggle source

Set the timeout on all socket operations, including connection

# File lib/simplerpc/client.rb, line 201
def timeout=(timeout)
  @timeout      = timeout
  @socket_timeout = nil

  if @timeout.to_f > 0
    secs            = @timeout.floor
    usecs           = (@timeout - secs).floor * 1_000_000
    @socket_timeout = [secs, usecs].pack("l_2")
  end
end

Private Instance Methods

_connect() click to toggle source

Connect to the server and return a socket

# File lib/simplerpc/client.rb, line 405
def _connect
  # Connect to the host
  # s = Socket.tcp(@hostname, @port, nil, nil, connect_timeout: @timeout)
  s = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 )

  # Disable Nagle's algorithm
  s.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)

  # Set timeout directly on socket
  if @socket_timeout
    s.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVTIMEO, @socket_timeout)
    s.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, @socket_timeout)
  end

  
  s.connect( Socket.pack_sockaddr_in( @port, @hostname.to_s ) )


  # if auth is required
  if @password && @secret
    salt      = SocketProtocol::Simple.recv(s)
    challenge = Encryption.encrypt(@password, @secret, salt)

    SocketProtocol::Simple.send(s, challenge)

    # Check return if not @fast_auth
    unless @fast_auth
      unless SocketProtocol::Simple.recv(s) == SocketProtocol::AUTH_SUCCESS
        s.close
        raise AuthenticationError, 'Authentication failed'
      end
    end
  end

  # Check and raise
  return s
end
_connected?(s) click to toggle source

Thread-unsafe check for connectedness

# File lib/simplerpc/client.rb, line 510
def _connected?(s)
  s && !s.closed?
end
_disconnect(s) click to toggle source

Disconnect a socket from the server

# File lib/simplerpc/client.rb, line 502
def _disconnect(s)
  return unless _connected?(s)

  # Then underlying socket
  s.close if s && !s.closed?
end
_get_socket() { |s, true| ... } click to toggle source

Get a socket from the reusable pool if possible, else spawn a new one.

Blocks if threading is off and the persistent socket is in use.

# File lib/simplerpc/client.rb, line 448
def _get_socket

  # If not threaded, try using @s and block on @mutex
  unless @threaded
    # Try to load from pool
    if @s
      # Persistent connection
      @mutex.synchronize do
        
        # Keepalive for pool sockets
        unless _connected?(@s)
          raise Errno::ECONNREFUSED, 'Failed to connect' unless (@s = _connect)
        end

        yield(@s, true) 
      end
    else
      # On-demand connection
      @mutex.synchronize { yield(_connect, false) }
    end
    return
  end

  # If threaded, try using the pool and use try_lock instead,
  # then fall back to using a new connection

  # Look through the pool to find a suitable socket
  @pool.each do |s, m|

    # If not threaded, block.
    if s && m && m.try_lock
      begin

        # Keepalive for pool sockets
        unless _connected?(s)
          raise Errno::ECONNREFUSED, 'Failed to connect' unless (s = _connect)
        end

        # Increase count of active connections and yield
        yield(s, true)
      ensure
        m.unlock
      end
      return
    end
  end

  # Else use a temporary one...
  s = _connect
  yield(s, false)
  _disconnect(s)
end