class Riemann::Client::TcpSocket

Socket: A specialized socket that has been configure

Attributes

connect_timeout[R]

Internal: The timeout for connecting in seconds. Defaults to 2

host[R]

Internal: The host this socket is connected to

keepalive_count[R]

Internal

Used for setting TCP_KEEPCNT: overrides tcp_keepalive_probes for a single socket.

tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html

tcp_keepalive_probes:

The number of unacknowledged probes to send before considering the
connection dead and notifying the application layer.
keepalive_idle[R]

Internal

Used for setting TCP_KEEPIDLE: overrides tcp_keepalive_time for a single socket.

tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html

tcp_keepalive_time:

The interval between the last data packet sent (simple ACKs are not
considered data) and the first keepalive probe; after the connection is
marked to need keepalive, this counter is not used any further.
keepalive_interval[R]

Internal

Used for setting TCP_KEEPINTVL: overrides tcp_keepalive_intvl for a single socket.

tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html

tcp_keepalive_intvl:

The interval between subsequential keepalive probes, regardless of what
the connection has exchanged in the meantime.
port[R]

Internal: The port this socket is connected to

read_timeout[RW]

Internal: The timeout for reading in seconds. Defaults to 2

write_timeout[R]

Internal: The timeout for writing in seconds. Defaults to 2

Public Class Methods

connect(options = {}) click to toggle source

Internal: Create and connect to the given location.

options, same as Constructor

Returns an instance of KJess::Socket

# File lib/riemann/client/tcp_socket.rb, line 78
def self.connect(options = {})
  s = new(options)
  s.connect
  s
end
new(options = {}) click to toggle source

Internal: Creates a new KJess::Socket

# File lib/riemann/client/tcp_socket.rb, line 85
def initialize(options = {})
  @host = options[:host]
  @port = options[:port]

  @connect_timeout = options[:connect_timeout] || options[:timeout] || 2
  @read_timeout    = options[:read_timeout]    || options[:timeout] || 2
  @write_timeout   = options[:write_timeout]   || options[:timeout] || 2

  @keepalive_active   = options.fetch(:keepalive_active, true)
  @keepalive_idle     = options[:keepalive_idle]     || 60
  @keepalive_interval = options[:keepalive_interval] || 30
  @keepalive_count    = options[:keepalive_count]    || 5

  @socket             = nil
end

Public Instance Methods

close() click to toggle source

Internal: Closes the internal ::Socket

Returns nothing

# File lib/riemann/client/tcp_socket.rb, line 150
def close
  @socket.close unless closed?
  @socket = nil
end
closed?() click to toggle source

Internal: Return true the socket is closed.

# File lib/riemann/client/tcp_socket.rb, line 156
def closed?
  return true if @socket.nil?
  return true if @socket.closed?

  false
end
connect() click to toggle source

Internal:

Connect to the remote host in a non-blocking fashion.

Raise Error if there is a failure connecting.

Return the ::Socket on success

# File lib/riemann/client/tcp_socket.rb, line 170
def connect
  # Calculate our timeout deadline
  deadline = Time.now.to_f + connect_timeout

  # Lookup destination address, we only want TCP.
  addrs      = ::Socket.getaddrinfo(host, port, nil, ::Socket::SOCK_STREAM)
  errors     = []
  conn_error = -> { raise errors.first }
  sock       = nil

  # Sort it so we get AF_INET, IPv4
  addrs.sort.find(conn_error) do |addr|
    sock = connect_or_error(addr, deadline, errors)
  end
  sock
end
connect_nonblock(addr, timeout) click to toggle source

Internal: Connect to the give address within the timeout.

Make an attempt to connect to a single address within the given timeout.

Return the ::Socket when it is connected, or raise an Error if no connection was possible.

# File lib/riemann/client/tcp_socket.rb, line 217
def connect_nonblock(addr, timeout)
  sockaddr = ::Socket.pack_sockaddr_in(addr[1], addr[3])
  sock     = socket_factory(addr[4])
  sock.connect_nonblock(sockaddr)
  sock
rescue Errno::EINPROGRESS
  if IO.select(nil, [sock], nil, timeout).nil?
    begin
      sock.close
    rescue StandardError
      nil
    end
    raise Timeout, "Could not connect to #{host}:#{port} within #{timeout} seconds"
  end
  connect_nonblock_finalize(sock, sockaddr)
rescue StandardError => e
  begin
    sock.close
  rescue StandardError
    nil
  end
  raise Error, "Could not connect to #{host}:#{port}: #{e.class}: #{e.message}", e.backtrace
end
connect_nonblock_finalize(sock, sockaddr) click to toggle source

Internal: Make sure that a non-blocking connect has truely connected.

Ensure that the given socket is actually connected to the given adddress.

Returning the socket if it is and raising an Error if it isn’t.

# File lib/riemann/client/tcp_socket.rb, line 246
def connect_nonblock_finalize(sock, sockaddr)
  sock.connect_nonblock(sockaddr)
  sock
rescue Errno::EISCONN
  sock
rescue StandardError => e
  begin
    sock.close
  rescue StandardError
    nil
  end
  raise Error, "Could not connect to #{host}:#{port}: #{e.class}: #{e.message}", e.backtrace
end
connect_or_error(addr, deadline, errors) click to toggle source

Internal: Connect to the destination or raise an error.

Connect to the address or capture the error of the connection

addr - An address returned from Socket.getaddrinfo() deadline - the after which we should raise a timeout error errors - a collection of errors to append an error too should we have one.

Make an attempt to connect to the given address. If it is successful, return the socket.

Should the connection fail, append the exception to the errors array and return false.

# File lib/riemann/client/tcp_socket.rb, line 201
def connect_or_error(addr, deadline, errors)
  timeout = deadline - Time.now.to_f
  raise Timeout, "Could not connect to #{host}:#{port}" if timeout <= 0

  connect_nonblock(addr, timeout)
rescue Error => e
  errors << e
  false
end
keepalive_active?() click to toggle source

Internal: Return whether or not the keepalive_active flag is set.

# File lib/riemann/client/tcp_socket.rb, line 102
def keepalive_active?
  @keepalive_active
end
read(length, outbuf = nil) click to toggle source

Reads length bytes from the socket

length - the number of bytes to read from the socket outbuf - an optional buffer to store the bytes in

Returns the bytes read if no outbuf is specified

# File lib/riemann/client/tcp_socket.rb, line 287
def read(length, outbuf = nil)
  if outbuf
    outbuf.replace('')
    buf = outbuf
  else
    buf = String.new
  end

  while buf.length < length
    unless (rb = readpartial(length - buf.length))
      break
    end

    buf << rb
  end

  buf
end
readpartial(maxlen, outbuf = nil) click to toggle source

Internal: Read up to a maxlen of data from the socket and store it in outbuf

maxlen - the maximum number of bytes to read from the socket outbuf - the buffer in which to store the bytes.

Returns the bytes read

# File lib/riemann/client/tcp_socket.rb, line 312
def readpartial(maxlen, outbuf = nil)
  socket.read_nonblock(maxlen, outbuf)
rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::ECONNRESET, IO::WaitReadable
  unless wait_readable(read_timeout)
    raise Timeout, "Could not read from #{host}:#{port} in #{read_timeout} seconds"
  end

  retry
end
socket() click to toggle source

Internal: Return the connected raw Socket.

If the socket is closed or non-existent it will create and connect again.

Returns a ::Socket

# File lib/riemann/client/tcp_socket.rb, line 141
def socket
  return @socket unless closed?

  @socket ||= connect
end
socket_factory(type) click to toggle source

Internal: Low level socket allocation and option configuration

Using the options from the initializer, a new ::Socket is created that is:

TCP, autoclosing on exit, nagle's algorithm is disabled and has
TCP Keepalive options set if keepalive is supported.

Returns a new ::Socket instance for

# File lib/riemann/client/tcp_socket.rb, line 116
def socket_factory(type)
  sock = ::Socket.new(type, ::Socket::SOCK_STREAM, 0)

  # close file descriptors if we exec
  if Fcntl.constants.include?(:F_SETFD) && Fcntl.constants.include?(:FD_CLOEXEC)
    sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
  end
  # Disable Nagle's algorithm
  sock.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1)

  if using_keepalive?
    sock.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_KEEPALIVE, true)
    sock.setsockopt(::Socket::SOL_TCP,    ::Socket::TCP_KEEPIDLE, keepalive_idle)
    sock.setsockopt(::Socket::SOL_TCP,    ::Socket::TCP_KEEPINTVL, keepalive_interval)
    sock.setsockopt(::Socket::SOL_TCP,    ::Socket::TCP_KEEPCNT, keepalive_count)
  end

  sock
end
using_keepalive?() click to toggle source

Internal: say if we are using TCP Keep Alive or not

We will return true if the initialization options :keepalive_active is set to true, and if all the constants that are necessary to use TCP keep alive are defined.

It may be the case that on some operating systems that the constants are not defined, so in that case we do not want to attempt to use tcp keep alive if we are unable to do so in any case.

Returns true or false

# File lib/riemann/client/tcp_socket.rb, line 271
def using_keepalive?
  using = false
  if keepalive_active?
    using = %i[SOL_SOCKET SO_KEEPALIVE SOL_TCP TCP_KEEPIDLE TCP_KEEPINTVL TCP_KEEPCNT].all? do |c|
      ::Socket.const_defined? c
    end
  end
  using
end
wait_readable(timeout = nil) click to toggle source
# File lib/riemann/client/tcp_socket.rb, line 355
def wait_readable(timeout = nil)
  IO.select([@socket], nil, nil, timeout || read_timeout)
end
wait_writable(timeout = nil) click to toggle source
# File lib/riemann/client/tcp_socket.rb, line 351
def wait_writable(timeout = nil)
  IO.select(nil, [@socket], nil, timeout || write_timeout)
end
write(buf) click to toggle source

Internal: Write the given data to the socket

buf - the data to write to the socket.

Raises an error if it is unable to write the data to the socket within the write_timeout.

returns nothing

# File lib/riemann/client/tcp_socket.rb, line 330
def write(buf)
  until buf.nil? || buf.empty?
    written = socket.write_nonblock(buf)
    buf = buf[written, buf.length]
  end
rescue Errno::EWOULDBLOCK, Errno::EINTR, Errno::EAGAIN, Errno::ECONNRESET, IO::WaitWritable
  unless wait_writable(write_timeout)
    raise Timeout, "Could not write to #{host}:#{port} in #{write_timeout} seconds"
  end

  retry
rescue IO::WaitReadable
  # Also rescued for SSL renegotiation in OpenSSL::SSL::SSLSocket according to
  # https://ruby-doc.org/core-2.7.1/IO.html#method-c-select
  unless wait_readable(read_timeout)
    raise Timeout, "Could not write to #{host}:#{port} in #{write_timeout} seconds"
  end

  retry
end