class Oxblood::RSocket

Thin socket wrapper made with resilience. Socket will be closed and automatically recreated in case of any errors (including timeout errors) in order to avoid inconsistent state.

Constants

LINGER_OPTION

JRuby before 9.1.6.0 don't properly support SO_LINGER setting @see github.com/jruby/jruby/issues/4040

TimeoutError

Attributes

timeout[RW]

@!attribute [rw] timeout

@return [Numeric] timeout in seconds

Public Class Methods

new(opts = {}) click to toggle source

Maintain socket

@param [Hash] opts Connection options

@option opts [Float] :timeout (1.0) socket read/write timeout

@option opts [String] :host ('localhost') Hostname or IP address to connect to @option opts [Integer] :port (6379) Port Redis server listens on @option opts [Float] :connect_timeout (1.0) socket connect timeout

@option opts [String] :path UNIX socket path

# File lib/oxblood/rsocket.rb, line 36
def initialize(opts = {})
  @opts = opts
  @timeout = opts.fetch(:timeout, 1.0)
  @socket = create_socket(opts)
  @buffer = String.new.encode!('ASCII-8BIT')
end

Public Instance Methods

close() click to toggle source

Close connection to server @return [nil] always return nil

# File lib/oxblood/rsocket.rb, line 88
def close
  @buffer.clear
  @socket && @socket.close
rescue IOError
  ;
ensure
  @socket = nil
end
connected?() click to toggle source

True if socket exists @return [Boolean] socket exists or not

# File lib/oxblood/rsocket.rb, line 99
def connected?
  !!@socket
end
gets(separator, timeout = @timeout) click to toggle source

Read until separator @param [String] separator separator @return [String] read result

# File lib/oxblood/rsocket.rb, line 59
def gets(separator, timeout = @timeout)
  while (crlf = @buffer.index(separator)).nil?
    @buffer << readpartial(1024, timeout)
  end

  @buffer.slice!(0, crlf + separator.bytesize)
end
read(nbytes, timeout = @timeout) click to toggle source

Read number of bytes @param [Integer] nbytes number of bytes to read @return [String] read result

# File lib/oxblood/rsocket.rb, line 46
def read(nbytes, timeout = @timeout)
  result = @buffer.slice!(0, nbytes)

  while result.bytesize < nbytes
    result << readpartial(nbytes - result.bytesize, timeout)
  end

  result
end
write(data, timeout = @timeout) click to toggle source

Write data to socket @param [String] data given @return [Integer] the number of bytes written

# File lib/oxblood/rsocket.rb, line 70
def write(data, timeout = @timeout)
  full_size = data.bytesize

  while data.bytesize > 0
    written = socket.write_nonblock(data, exception: false)

    if written == :wait_writable
      socket.wait_writable(timeout) or fail_with_timeout!
    else
      data = data.byteslice(written..-1)
    end
  end

  full_size
end

Private Instance Methods

create_socket(opts) click to toggle source
# File lib/oxblood/rsocket.rb, line 109
def create_socket(opts)
  if opts.key?(:path)
    UNIXSocket.new(opts.fetch(:path))
  else
    host = opts.fetch(:host, 'localhost')
    port = opts.fetch(:port, 6379)
    connect_timeout = opts.fetch(:connect_timeout, 1.0)

    Socket.tcp(host, port, connect_timeout: connect_timeout).tap do |sock|
      sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
    end
  end
end
fail_with_timeout!() click to toggle source
# File lib/oxblood/rsocket.rb, line 136
def fail_with_timeout!
  # In case of failure close socket ASAP
  socket.setsockopt(*LINGER_OPTION)
  close
  raise TimeoutError
end
readpartial(nbytes, timeout) click to toggle source
# File lib/oxblood/rsocket.rb, line 123
def readpartial(nbytes, timeout)
  case data = socket.read_nonblock(nbytes, exception: false)
  when String
    return data
  when :wait_readable
    socket.wait_readable(timeout) or fail_with_timeout!
  when nil
    close
    raise Errno::ECONNRESET
  end while true
end
socket() click to toggle source
# File lib/oxblood/rsocket.rb, line 105
def socket
  @socket ||= create_socket(@opts)
end