class Digital::Transport::Adapters::Tcp

Constants

DEFAULTS

Public Class Methods

new(ip, port, opts = {}) click to toggle source
# File lib/digital/transport/adapters/tcp.rb, line 15
def initialize(ip, port, opts = {})
  @ip = FinalVar.new(ip)
  @opts = DEFAULTS.merge(opts.dup).freeze
  @port = FinalVar.new(port)
  @io = nil
end

Public Instance Methods

close() click to toggle source

@return [Boolean] representing operation state

# File lib/digital/transport/adapters/tcp.rb, line 41
def close
  return !open? unless open?
  flush
  @io && @io.close
  @io = nil
  !open?
end
connect() click to toggle source

@return [Either] monad representing a value of one of two possible types

  1. Exception covariant

  2. Adapter interface invariant

@see Adapters#new_tcp_adapter

# File lib/digital/transport/adapters/tcp.rb, line 28
def connect
  return open? if open?
  Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0).tap do |socket|
    socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, @opts[:tcp_nodelay].to_i)
    return connect_nonblock(
        socket,
        Socket.pack_sockaddr_in(@port.value, @ip.value),
        @opts[:timeout].to_i.nonzero? || DEFAULTS[:timeout]
    )
  end
end
flush() click to toggle source
# File lib/digital/transport/adapters/tcp.rb, line 49
def flush
  @io && @io.flush
end
open?() click to toggle source

@return [Boolean] connection state predicate

# File lib/digital/transport/adapters/tcp.rb, line 87
def open?
  @io && !@io.closed?
end
read(count, should_block = false) click to toggle source

@param [Fixnum] count how many bytes to read @param [Boolean] should_block which will simulate blocking function

# File lib/digital/transport/adapters/tcp.rb, line 73
def read(count, should_block = false)
  raise NotConnected unless open?
  Either.right @io.read_nonblock(count)
rescue IO::WaitReadable
  if should_block
    IO.select [@io]
    retry
  end
  Either.right(nil)
rescue => ex
  Either.left(ex)
end
write(string) click to toggle source

@param [String] string

# File lib/digital/transport/adapters/tcp.rb, line 54
def write(string)
  raise NotConnected unless open?
  written = 0
  while 0 < string.bytesize
    begin
      written = @io.write_nonblock(string)
    rescue IO::WaitWritable
      IO.select(nil, [@io])
      retry
    end
    string = string.byteslice(written..-1)
  end
  Either.right(written)
rescue => ex
  Either.left(ex)
end

Private Instance Methods

connect_nonblock(io_like, endpoint, timeout) click to toggle source

@api private

# File lib/digital/transport/adapters/tcp.rb, line 94
def connect_nonblock(io_like, endpoint, timeout)
  io_like.connect_nonblock(endpoint)
  raise Errno::EISCONN
rescue Errno::EINPROGRESS
  if IO.select(nil, [io_like], nil, timeout)
    retry
  else
    io_like.close
    Either.left(Timeout::Error.new('Connection timeout'))
  end
rescue Errno::EISCONN
  init_io io_like
  Either.right(self)
rescue => ex
  io_like.close
  Either.left(ex)
end
init_io(io) click to toggle source
# File lib/digital/transport/adapters/tcp.rb, line 112
def init_io(io)
  @io = io
end