class DatTCP::Server

Constants

DEFAULT_BACKLOG_SIZE
DEFAULT_NUM_WORKERS
DEFAULT_SHUTDOWN_TIMEOUT
SIGNAL

Public Class Methods

new(worker_class, options = nil) click to toggle source
# File lib/dat-tcp.rb, line 18
def initialize(worker_class, options = nil)
  if !worker_class.kind_of?(Class) || !worker_class.include?(DatTCP::Worker)
    raise ArgumentError, "worker class must include `#{DatTCP::Worker}`"
  end

  options ||= {}
  @backlog_size     = options[:backlog_size]     || DEFAULT_BACKLOG_SIZE
  @shutdown_timeout = options[:shutdown_timeout] || DEFAULT_SHUTDOWN_TIMEOUT

  @signal_reader, @signal_writer = IO.pipe

  @logger_proxy = if options[:logger]
    LoggerProxy.new(options[:logger])
  else
    NullLoggerProxy.new
  end

  @worker_pool = DatWorkerPool.new(worker_class, {
    :num_workers   => (options[:num_workers] || DEFAULT_NUM_WORKERS),
    :logger        => options[:logger],
    :worker_params => options[:worker_params]
  })

  @tcp_server = nil
  @thread     = nil
  @state      = State.new(:stop)
end

Public Instance Methods

client_file_descriptors() click to toggle source
# File lib/dat-tcp.rb, line 58
def client_file_descriptors
  @worker_pool.work_items.map(&:fileno)
end
file_descriptor() click to toggle source
# File lib/dat-tcp.rb, line 54
def file_descriptor
  @tcp_server.fileno if self.listening?
end
halt(wait = false) click to toggle source
# File lib/dat-tcp.rb, line 103
def halt(wait = false)
  return unless self.running?
  @state.set :halt
  wakeup_thread
  wait_for_shutdown if wait
end
inspect() click to toggle source
# File lib/dat-tcp.rb, line 110
def inspect
  reference = '0x0%x' % (self.object_id << 1)
  "#<#{self.class}:#{reference}".tap do |s|
    s << " @ip=#{ip.inspect} @port=#{port.inspect}"
    s << ">"
  end
end
ip() click to toggle source
# File lib/dat-tcp.rb, line 46
def ip
  @tcp_server.addr[3] if self.listening?
end
listen(*args) { |tcp_server| ... } click to toggle source
# File lib/dat-tcp.rb, line 70
def listen(*args)
  @state.set :listen
  @tcp_server = TCPServer.build(*args)
  raise ArgumentError, "takes ip and port or file descriptor" if !@tcp_server
  yield @tcp_server if block_given?
  @tcp_server.listen(@backlog_size)
end
listening?() click to toggle source
# File lib/dat-tcp.rb, line 62
def listening?
  !!@tcp_server
end
pause(wait = false) click to toggle source
# File lib/dat-tcp.rb, line 89
def pause(wait = false)
  return unless self.running?
  @state.set :pause
  wakeup_thread
  wait_for_shutdown if wait
end
port() click to toggle source
# File lib/dat-tcp.rb, line 50
def port
  @tcp_server.addr[1] if self.listening?
end
running?() click to toggle source
# File lib/dat-tcp.rb, line 66
def running?
  !!(@thread && @thread.alive?)
end
start(passed_client_fds = nil) click to toggle source
# File lib/dat-tcp.rb, line 83
def start(passed_client_fds = nil)
  raise NotListeningError.new unless listening?
  @state.set :run
  @thread = Thread.new{ work_loop(passed_client_fds) }
end
stop(wait = false) click to toggle source
# File lib/dat-tcp.rb, line 96
def stop(wait = false)
  return unless self.running?
  @state.set :stop
  wakeup_thread
  wait_for_shutdown if wait
end
stop_listen() click to toggle source
# File lib/dat-tcp.rb, line 78
def stop_listen
  @tcp_server.close rescue false
  @tcp_server = nil
end

Private Instance Methods

accept_client_connections() click to toggle source
# File lib/dat-tcp.rb, line 139
def accept_client_connections
  ready_inputs, _, _ = IO.select([@tcp_server, @signal_reader])

  if ready_inputs.include?(@tcp_server)
    @worker_pool.push @tcp_server.accept
  end

  if ready_inputs.include?(@signal_reader)
    @signal_reader.read_nonblock(SIGNAL.bytesize)
  end
end
log(&message_block) click to toggle source
# File lib/dat-tcp.rb, line 171
def log(&message_block)
  @logger_proxy.log(&message_block)
end
setup(passed_client_fds) click to toggle source
# File lib/dat-tcp.rb, line 132
def setup(passed_client_fds)
  @worker_pool.start
  (passed_client_fds || []).each do |fd|
    @worker_pool.push TCPSocket.for_fd(fd)
  end
end
teardown() click to toggle source
# File lib/dat-tcp.rb, line 151
def teardown
  unless @state.pause?
    log{ "Stop listening for connections, closing TCP socket" }
    self.stop_listen
  end

  timeout = @state.halt? ? 0 : @shutdown_timeout
  @worker_pool.shutdown(timeout)
ensure
  @thread = nil
end
wait_for_shutdown() click to toggle source
# File lib/dat-tcp.rb, line 167
def wait_for_shutdown
  @thread.join if @thread
end
wakeup_thread() click to toggle source
# File lib/dat-tcp.rb, line 163
def wakeup_thread
  @signal_writer.write_nonblock(SIGNAL)
end
work_loop(passed_client_fds) click to toggle source
# File lib/dat-tcp.rb, line 120
def work_loop(passed_client_fds)
  setup(passed_client_fds)
  accept_client_connections while @state.run?
rescue StandardError => exception
  @state.set :stop
  log{ "An error occurred while running the server, exiting" }
  log{ "#{exception.class}: #{exception.message}" }
  (exception.backtrace || []).each{ |l| log{ l } }
ensure
  teardown
end