class Oxblood::RSocket
Thin socket wrapper made with resilience. Socket will be closed and automatically recreated in case of any errors (including timeout errors) in order to avoid inconsistent state.
Constants
- LINGER_OPTION
JRuby before 9.1.6.0 don't properly support SO_LINGER setting @see github.com/jruby/jruby/issues/4040
- TimeoutError
Attributes
@!attribute [rw] timeout
@return [Numeric] timeout in seconds
Public Class Methods
Maintain socket
@param [Hash] opts Connection
options
@option opts [Float] :timeout (1.0) socket read/write timeout
@option opts [String] :host ('localhost') Hostname or IP address to connect to @option opts [Integer] :port (6379) Port Redis
server listens on @option opts [Float] :connect_timeout (1.0) socket connect timeout
@option opts [String] :path UNIX socket path
# File lib/oxblood/rsocket.rb, line 36 def initialize(opts = {}) @opts = opts @timeout = opts.fetch(:timeout, 1.0) @socket = create_socket(opts) @buffer = String.new.encode!('ASCII-8BIT') end
Public Instance Methods
Close connection to server @return [nil] always return nil
# File lib/oxblood/rsocket.rb, line 88 def close @buffer.clear @socket && @socket.close rescue IOError ; ensure @socket = nil end
True if socket exists @return [Boolean] socket exists or not
# File lib/oxblood/rsocket.rb, line 99 def connected? !!@socket end
Read until separator @param [String] separator separator @return [String] read result
# File lib/oxblood/rsocket.rb, line 59 def gets(separator, timeout = @timeout) while (crlf = @buffer.index(separator)).nil? @buffer << readpartial(1024, timeout) end @buffer.slice!(0, crlf + separator.bytesize) end
Read number of bytes @param [Integer] nbytes number of bytes to read @return [String] read result
# File lib/oxblood/rsocket.rb, line 46 def read(nbytes, timeout = @timeout) result = @buffer.slice!(0, nbytes) while result.bytesize < nbytes result << readpartial(nbytes - result.bytesize, timeout) end result end
Write data to socket @param [String] data given @return [Integer] the number of bytes written
# File lib/oxblood/rsocket.rb, line 70 def write(data, timeout = @timeout) full_size = data.bytesize while data.bytesize > 0 written = socket.write_nonblock(data, exception: false) if written == :wait_writable socket.wait_writable(timeout) or fail_with_timeout! else data = data.byteslice(written..-1) end end full_size end
Private Instance Methods
# File lib/oxblood/rsocket.rb, line 109 def create_socket(opts) if opts.key?(:path) UNIXSocket.new(opts.fetch(:path)) else host = opts.fetch(:host, 'localhost') port = opts.fetch(:port, 6379) connect_timeout = opts.fetch(:connect_timeout, 1.0) Socket.tcp(host, port, connect_timeout: connect_timeout).tap do |sock| sock.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) end end end
# File lib/oxblood/rsocket.rb, line 136 def fail_with_timeout! # In case of failure close socket ASAP socket.setsockopt(*LINGER_OPTION) close raise TimeoutError end
# File lib/oxblood/rsocket.rb, line 123 def readpartial(nbytes, timeout) case data = socket.read_nonblock(nbytes, exception: false) when String return data when :wait_readable socket.wait_readable(timeout) or fail_with_timeout! when nil close raise Errno::ECONNRESET end while true end
# File lib/oxblood/rsocket.rb, line 105 def socket @socket ||= create_socket(@opts) end