class DatTCP::Server
Constants
- DEFAULT_BACKLOG_SIZE
- DEFAULT_NUM_WORKERS
- DEFAULT_SHUTDOWN_TIMEOUT
- SIGNAL
Public Class Methods
new(worker_class, options = nil)
click to toggle source
# File lib/dat-tcp.rb, line 18 def initialize(worker_class, options = nil) if !worker_class.kind_of?(Class) || !worker_class.include?(DatTCP::Worker) raise ArgumentError, "worker class must include `#{DatTCP::Worker}`" end options ||= {} @backlog_size = options[:backlog_size] || DEFAULT_BACKLOG_SIZE @shutdown_timeout = options[:shutdown_timeout] || DEFAULT_SHUTDOWN_TIMEOUT @signal_reader, @signal_writer = IO.pipe @logger_proxy = if options[:logger] LoggerProxy.new(options[:logger]) else NullLoggerProxy.new end @worker_pool = DatWorkerPool.new(worker_class, { :num_workers => (options[:num_workers] || DEFAULT_NUM_WORKERS), :logger => options[:logger], :worker_params => options[:worker_params] }) @tcp_server = nil @thread = nil @state = State.new(:stop) end
Public Instance Methods
client_file_descriptors()
click to toggle source
# File lib/dat-tcp.rb, line 58 def client_file_descriptors @worker_pool.work_items.map(&:fileno) end
file_descriptor()
click to toggle source
# File lib/dat-tcp.rb, line 54 def file_descriptor @tcp_server.fileno if self.listening? end
halt(wait = false)
click to toggle source
# File lib/dat-tcp.rb, line 103 def halt(wait = false) return unless self.running? @state.set :halt wakeup_thread wait_for_shutdown if wait end
inspect()
click to toggle source
# File lib/dat-tcp.rb, line 110 def inspect reference = '0x0%x' % (self.object_id << 1) "#<#{self.class}:#{reference}".tap do |s| s << " @ip=#{ip.inspect} @port=#{port.inspect}" s << ">" end end
ip()
click to toggle source
# File lib/dat-tcp.rb, line 46 def ip @tcp_server.addr[3] if self.listening? end
listen(*args) { |tcp_server| ... }
click to toggle source
# File lib/dat-tcp.rb, line 70 def listen(*args) @state.set :listen @tcp_server = TCPServer.build(*args) raise ArgumentError, "takes ip and port or file descriptor" if !@tcp_server yield @tcp_server if block_given? @tcp_server.listen(@backlog_size) end
listening?()
click to toggle source
# File lib/dat-tcp.rb, line 62 def listening? !!@tcp_server end
pause(wait = false)
click to toggle source
# File lib/dat-tcp.rb, line 89 def pause(wait = false) return unless self.running? @state.set :pause wakeup_thread wait_for_shutdown if wait end
port()
click to toggle source
# File lib/dat-tcp.rb, line 50 def port @tcp_server.addr[1] if self.listening? end
running?()
click to toggle source
# File lib/dat-tcp.rb, line 66 def running? !!(@thread && @thread.alive?) end
start(passed_client_fds = nil)
click to toggle source
# File lib/dat-tcp.rb, line 83 def start(passed_client_fds = nil) raise NotListeningError.new unless listening? @state.set :run @thread = Thread.new{ work_loop(passed_client_fds) } end
stop(wait = false)
click to toggle source
# File lib/dat-tcp.rb, line 96 def stop(wait = false) return unless self.running? @state.set :stop wakeup_thread wait_for_shutdown if wait end
stop_listen()
click to toggle source
# File lib/dat-tcp.rb, line 78 def stop_listen @tcp_server.close rescue false @tcp_server = nil end
Private Instance Methods
accept_client_connections()
click to toggle source
# File lib/dat-tcp.rb, line 139 def accept_client_connections ready_inputs, _, _ = IO.select([@tcp_server, @signal_reader]) if ready_inputs.include?(@tcp_server) @worker_pool.push @tcp_server.accept end if ready_inputs.include?(@signal_reader) @signal_reader.read_nonblock(SIGNAL.bytesize) end end
log(&message_block)
click to toggle source
# File lib/dat-tcp.rb, line 171 def log(&message_block) @logger_proxy.log(&message_block) end
setup(passed_client_fds)
click to toggle source
# File lib/dat-tcp.rb, line 132 def setup(passed_client_fds) @worker_pool.start (passed_client_fds || []).each do |fd| @worker_pool.push TCPSocket.for_fd(fd) end end
teardown()
click to toggle source
# File lib/dat-tcp.rb, line 151 def teardown unless @state.pause? log{ "Stop listening for connections, closing TCP socket" } self.stop_listen end timeout = @state.halt? ? 0 : @shutdown_timeout @worker_pool.shutdown(timeout) ensure @thread = nil end
wait_for_shutdown()
click to toggle source
# File lib/dat-tcp.rb, line 167 def wait_for_shutdown @thread.join if @thread end
wakeup_thread()
click to toggle source
# File lib/dat-tcp.rb, line 163 def wakeup_thread @signal_writer.write_nonblock(SIGNAL) end
work_loop(passed_client_fds)
click to toggle source
# File lib/dat-tcp.rb, line 120 def work_loop(passed_client_fds) setup(passed_client_fds) accept_client_connections while @state.run? rescue StandardError => exception @state.set :stop log{ "An error occurred while running the server, exiting" } log{ "#{exception.class}: #{exception.message}" } (exception.backtrace || []).each{ |l| log{ l } } ensure teardown end