class Socketry::TCP::Socket

Transmission Control Protocol sockets: Provide stream-like semantics

Attributes

addr_fmaily[R]
local_addr[R]
local_port[R]
read_timeout[R]
remote_addr[R]
remote_port[R]
resolver[R]
socket_class[R]
write_timeout[R]

Public Class Methods

connect(remote_addr, remote_port, **args) click to toggle source

Create a Socketry::TCP::Socket with the default options, then connect to the given host.

@param remote_addr [String] DNS name or IP address of the host to connect to @param remote_port [Fixnum] TCP port to connect to

@return [Socketry::TCP::Socket]

# File lib/socketry/tcp/socket.rb, line 20
def self.connect(remote_addr, remote_port, **args)
  new.connect(remote_addr, remote_port, **args)
end
new( read_timeout: Socketry::Timeout::DEFAULT_TIMEOUTS[:read], write_timeout: Socketry::Timeout::DEFAULT_TIMEOUTS[:write], timer: Socketry::Timeout::DEFAULT_TIMER.new, resolver: Socketry::Resolver::DEFAULT_RESOLVER, socket_class: ::Socket ) click to toggle source

Create an unconnected Socketry::TCP::Socket

@param read_timeout [Numeric] Seconds to wait before an uncompleted read errors @param write_timeout [Numeric] Seconds to wait before an uncompleted write errors @param timer [Object] A timekeeping object to use for measuring timeouts @param resolver [Object] A resolver object to use for resolving DNS names @param socket_class [Object] Underlying socket class which implements I/O ops

@return [Socketry::TCP::Socket]

# File lib/socketry/tcp/socket.rb, line 33
def initialize(
  read_timeout: Socketry::Timeout::DEFAULT_TIMEOUTS[:read],
  write_timeout: Socketry::Timeout::DEFAULT_TIMEOUTS[:write],
  timer: Socketry::Timeout::DEFAULT_TIMER.new,
  resolver: Socketry::Resolver::DEFAULT_RESOLVER,
  socket_class: ::Socket
)
  @read_timeout = read_timeout
  @write_timeout = write_timeout

  @socket_class = socket_class
  @resolver = resolver

  @addr_family = nil
  @socket = nil

  @remote_addr = nil
  @remote_port = nil
  @local_addr  = nil
  @local_port  = nil

  start_timer(timer)
end

Public Instance Methods

close() click to toggle source

Close the socket

@return [true, false] true if the socket was open, false if closed

# File lib/socketry/tcp/socket.rb, line 311
def close
  return false if closed?
  @socket.close
  true
ensure
  @socket = nil
end
closed?() click to toggle source

Is the socket closed?

This method returns the local connection state. However, it's possible the remote side has closed the connection, so it's not actually possible to actually know if the socket is actually still open without reading from or writing to it. It's sort of like the Heisenberg uncertainty principle of sockets.

@return [true, false] do we locally think the socket is closed?

# File lib/socketry/tcp/socket.rb, line 328
def closed?
  @socket.nil?
end
connect( remote_addr, remote_port, local_addr: nil, local_port: nil, timeout: Socketry::Timeout::DEFAULT_TIMEOUTS[:connect] ) click to toggle source

Connect to a remote host

@param remote_addr [String] DNS name or IP address of the host to connect to @param remote_port [Fixnum] TCP port to connect to @param local_addr [String] DNS name or IP address to bind to locally @param local_port [Fixnum] Local TCP port to bind to @param timeout [Numeric] Number of seconds to wait before aborting connect

@raise [Socketry::AddressError] an invalid address was given @raise [Socketry::TimeoutError] connect operation timed out

@return [self]

# File lib/socketry/tcp/socket.rb, line 69
def connect(
  remote_addr,
  remote_port,
  local_addr: nil,
  local_port: nil,
  timeout: Socketry::Timeout::DEFAULT_TIMEOUTS[:connect]
)
  ensure_disconnected

  @remote_addr = remote_addr
  @remote_port = remote_port
  @local_addr  = local_addr
  @local_port  = local_port

  begin
    set_timeout(timeout)

    remote_addr = @resolver.resolve(remote_addr, timeout: time_remaining(timeout))
    local_addr  = @resolver.resolve(local_addr,  timeout: time_remaining(timeout)) if local_addr
    raise ArgumentError, "expected IPAddr from resolver, got #{remote_addr.class}" unless remote_addr.is_a?(IPAddr)

    @addr_family = if remote_addr.ipv4?    then ::Socket::AF_INET
                   elsif remote_addr.ipv6? then ::Socket::AF_INET6
                   else raise Socketry::AddressError, "unsupported IP address family: #{remote_addr}"
                   end

    socket = @socket_class.new(@addr_family, ::Socket::SOCK_STREAM, 0)
    socket.bind Addrinfo.tcp(local_addr.to_s, local_port) if local_addr
    remote_sockaddr = ::Socket.sockaddr_in(remote_port, remote_addr.to_s)

    # Note: `exception: false` for Socket#connect_nonblock is only supported in Ruby 2.3+
    begin
      socket.connect_nonblock(remote_sockaddr)
    rescue Errno::ECONNREFUSED => ex
      raise Socketry::ConnectionRefusedError, "connection to #{remote_addr}:#{remote_port} refused", ex.backtrace
    rescue Errno::EINPROGRESS, Errno::EALREADY
      # Earlier JRuby 9.x versions do not seem to correctly support Socket#wait_writable in this case
      # Newer versions seem to behave correctly
      retry if IO.select(nil, [socket], nil, time_remaining(timeout))

      socket.close
      raise Socketry::TimeoutError, "connection to #{remote_addr}:#{remote_port} timed out"
    rescue Errno::EISCONN
      # Sometimes raised when we've connected successfully
    end

    @socket = socket
  ensure
    clear_timeout(timeout)
  end

  self
end
from_socket(socket) click to toggle source

Wrap a Ruby/low-level socket in an Socketry::TCP::Socket

@param socket [::Socket] (or specified socket_class) low-level socket to wrap

# File lib/socketry/tcp/socket.rb, line 136
def from_socket(socket)
  ensure_disconnected
  raise TypeError, "expected #{@socket_class}, got #{socket.class}" unless socket.is_a?(@socket_class)
  @socket = socket
  self
end
nodelay() click to toggle source

Check whether Nagle's algorithm has been disabled

@return [true] Nagle's algorithm has been explicitly disabled @return [false] Nagle's algorithm is enabled (default)

# File lib/socketry/tcp/socket.rb, line 287
def nodelay
  ensure_connected
  @socket.getsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY).int.nonzero?
end
nodelay=(flag) click to toggle source

Disable or enable Nagle's algorithm

@param flag [true, false] disable or enable coalescing multiple writesusing Nagle's algorithm

# File lib/socketry/tcp/socket.rb, line 295
def nodelay=(flag)
  ensure_connected
  @socket.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, flag ? 1 : 0)
end
read(size, outbuf: String.new, timeout: @write_timeout) click to toggle source

Read all of the data in a given string to a socket unless timeout or EOF

@param size [Fixnum] number of bytes to attempt to read @param outbuf [String] an output buffer to read data into @param timeout [Numeric] Number of seconds to wait for read operation to complete

@raise [Socketry::Error] an I/O operation failed

@return [String, :eof] bytes read, or :eof if socket closed while reading

# File lib/socketry/tcp/socket.rb, line 198
def read(size, outbuf: String.new, timeout: @write_timeout)
  outbuf.clear
  deadline = lifetime + timeout if timeout

  begin
    until outbuf.size == size
      time_remaining = deadline - lifetime if deadline
      raise Socketry::TimeoutError, "read timed out after #{timeout} seconds" if timeout && time_remaining <= 0

      chunk = readpartial(size - outbuf.size, timeout: time_remaining)
      return :eof if chunk == :eof

      outbuf << chunk
    end
  end

  outbuf
end
read_nonblock(size, outbuf: nil) click to toggle source

Perform a non-blocking read operation

@param size [Fixnum] number of bytes to attempt to read @param outbuf [String, NilClass] an optional buffer into which data should be read

@raise [Socketry::Error] an I/O operation failed

@return [String, :wait_readable] data read, or :wait_readable if operation would block

# File lib/socketry/tcp/socket.rb, line 151
def read_nonblock(size, outbuf: nil)
  ensure_connected
  case outbuf
  when String
    @socket.read_nonblock(size, outbuf, exception: false)
  when NilClass
    @socket.read_nonblock(size, exception: false)
  else raise TypeError, "unexpected outbuf class: #{outbuf.class}"
  end
rescue IO::WaitReadable
  # Some buggy Rubies continue to raise this exception
  :wait_readable
rescue IOError => ex
  raise Socketry::Error, ex.message, ex.backtrace
end
readpartial(size, outbuf: nil, timeout: @read_timeout) click to toggle source

Read a partial amounth of data, blocking until it becomes available

@param size [Fixnum] number of bytes to attempt to read @param outbuf [String] an output buffer to read data into @param timeout [Numeric] Number of seconds to wait for read operation to complete @raise [Socketry::Error] an I/O operation failed @return [String, :eof] bytes read, or :eof if socket closed while reading

# File lib/socketry/tcp/socket.rb, line 174
def readpartial(size, outbuf: nil, timeout: @read_timeout)
  set_timeout(timeout)

  begin
    while (result = read_nonblock(size, outbuf: outbuf)) == :wait_readable
      next if @socket.wait_readable(time_remaining(timeout))
      raise TimeoutError, "read timed out after #{timeout} seconds"
    end
  ensure
    clear_timeout(timeout)
  end

  result || :eof
end
reconnect(timeout: Socketry::Timeout::DEFAULT_TIMEOUTS[:connect]) click to toggle source

Re-establish a lost TCP connection

@param timeout [Numeric] Number of seconds to wait before aborting re-connect @raise [Socketry::StateError] not in a disconnected state

# File lib/socketry/tcp/socket.rb, line 127
def reconnect(timeout: Socketry::Timeout::DEFAULT_TIMEOUTS[:connect])
  ensure_disconnected
  raise StateError, "can't reconnect: never completed initial connection" unless @remote_addr
  connect(@remote_addr, @remote_port, local_addr: @local_addr, local_port: @local_port, timeout: timeout)
end
to_io() click to toggle source

Return a raw Ruby I/O object

@return [IO] Ruby I/O object

# File lib/socketry/tcp/socket.rb, line 303
def to_io
  ensure_connected
  ::IO.try_convert(@socket)
end
write(data, timeout: @write_timeout) click to toggle source

Write all of the data in a given string to a socket unless timeout or EOF

@param data [String] data to write to the socket @param timeout [Numeric] Number of seconds to wait for write operation to complete

@raise [Socketry::Error] an I/O operation failed

@return [Fixnum] number of bytes written, or :eof if socket closed during writing

# File lib/socketry/tcp/socket.rb, line 263
def write(data, timeout: @write_timeout)
  total_written = data.size
  deadline = lifetime + timeout if timeout

  begin
    until data.empty?
      time_remaining = deadline - lifetime if deadline
      raise Socketry::TimeoutError, "write timed out after #{timeout} seconds" if timeout && time_remaining <= 0

      bytes_written = writepartial(data, timeout: time_remaining)
      return :eof if bytes_written == :eof

      break if bytes_written == data.bytesize
      data = data.byteslice(bytes_written..-1)
    end
  end

  total_written
end
write_nonblock(data) click to toggle source

Perform a non-blocking write operation

@param data [String] data to write to the socket

@raise [Socketry::Error] an I/O operation failed

@return [Fixnum, :wait_writable] number of bytes written, or :wait_writable if op would block

# File lib/socketry/tcp/socket.rb, line 224
def write_nonblock(data)
  ensure_connected
  @socket.write_nonblock(data, exception: false)
rescue IO::WaitWriteable
  # Some buggy Rubies continue to raise this exception
  :wait_writable
rescue IOError => ex
  raise Socketry::Error, ex.message, ex.backtrace
end
writepartial(data, timeout: @write_timeout) click to toggle source

Write a partial amounth of data, blocking until it's completed

@param data [String] data to write to the socket @param timeout [Numeric] Number of seconds to wait for write operation to complete @raise [Socketry::Error] an I/O operation failed @return [Fixnum, :eof] number of bytes written, or :eof if socket closed during writing

# File lib/socketry/tcp/socket.rb, line 240
def writepartial(data, timeout: @write_timeout)
  set_timeout(timeout)

  begin
    while (result = write_nonblock(data)) == :wait_writable
      next if @socket.wait_writable(time_remaining(timeout))
      raise TimeoutError, "write timed out after #{timeout} seconds"
    end
  ensure
    clear_timeout(timeout)
  end

  result || :eof
end

Private Instance Methods

ensure_connected() click to toggle source
# File lib/socketry/tcp/socket.rb, line 334
def ensure_connected
  raise StateError, "not connected" if closed?
  true
end
ensure_disconnected() click to toggle source
# File lib/socketry/tcp/socket.rb, line 339
def ensure_disconnected
  return true if closed?
  raise StateError, "already connected"
end