class Async::IO::Socket
Public Class Methods
Bind to a local address and accept connections in a loop.
# File lib/async/io/socket.rb, line 177 def self.accept(*args, backlog: SOMAXCONN, &block) bind(*args) do |server, task| server.listen(backlog) if backlog server.accept_each(task: task, &block) end end
Bind to a local address. @example
socket = Async::IO::Socket.bind(Async::IO::Address.tcp("0.0.0.0", 9090))
@param local_address [Address] The local address to bind to. @option protocol [Integer] The socket protocol to use.
# File lib/async/io/socket.rb, line 156 def self.bind(local_address, protocol: 0, task: Task.current, **options, &block) Console.logger.debug(self) {"Binding to #{local_address.inspect}"} wrapper = build(local_address.afamily, local_address.socktype, protocol, **options) do |socket| socket.bind(local_address.to_sockaddr) end return wrapper unless block_given? task.async do |task| task.annotate "binding to #{wrapper.local_address.inspect}" begin yield wrapper, task ensure wrapper.close end end end
Build and wrap the underlying io. @option reuse_port [Boolean] Allow this port to be bound in multiple processes. @option reuse_address [Boolean] Allow this port to be bound in multiple processes.
# File lib/async/io/socket.rb, line 86 def self.build(*args, timeout: nil, reuse_address: true, reuse_port: nil, linger: nil, task: Task.current) socket = wrapped_klass.new(*args) if reuse_address socket.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) end if reuse_port socket.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1) end if linger socket.setsockopt(SOL_SOCKET, SO_LINGER, linger) end yield socket wrapper = self.new(socket, task.reactor) wrapper.timeout = timeout return wrapper rescue Exception socket.close if socket raise end
Establish a connection to a given `remote_address`. @example
socket = Async::IO::Socket.connect(Async::IO::Address.tcp("8.8.8.8", 53))
@param remote_address [Address] The remote address to connect to. @option local_address [Address] The local address to bind to before connecting.
# File lib/async/io/socket.rb, line 118 def self.connect(remote_address, local_address: nil, task: Task.current, **options) Console.logger.debug(self) {"Connecting to #{remote_address.inspect}"} task.annotate "connecting to #{remote_address.inspect}" wrapper = build(remote_address.afamily, remote_address.socktype, remote_address.protocol, **options) do |socket| if local_address if defined?(IP_BIND_ADDRESS_NO_PORT) # Inform the kernel (Linux 4.2+) to not reserve an ephemeral port when using bind(2) with a port number of 0. The port will later be automatically chosen at connect(2) time, in a way that allows sharing a source port as long as the 4-tuple is unique. socket.setsockopt(SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1) end socket.bind(local_address.to_sockaddr) end end begin wrapper.connect(remote_address.to_sockaddr) task.annotate "connected to #{remote_address.inspect} [fd=#{wrapper.fileno}]" rescue Exception wrapper.close raise end return wrapper unless block_given? begin yield wrapper, task ensure wrapper.close end end
# File lib/async/io/socket.rb, line 187 def self.pair(*args) ::Socket.pair(*args).map(&self.method(:new)) end
Public Instance Methods
@param duration [Numeric] the maximum time to wait for accepting a connection, if specified.
# File lib/async/io/socket.rb, line 61 def accept(timeout: nil, task: Task.current) peer, address = async_send(:accept_nonblock, timeout: timeout) wrapper = Socket.new(peer, task.reactor) wrapper.timeout = self.timeout return wrapper, address unless block_given? task.async do |task| task.annotate "incoming connection #{address.inspect} [fd=#{wrapper.fileno}]" begin yield wrapper, address ensure wrapper.close end end end
@raise Errno::EAGAIN the connection failed due to the remote end being overloaded.
# File lib/async/io/socket.rb, line 50 def connect(*args) begin async_send(:connect_nonblock, *args) rescue Errno::EISCONN # We are now connected. end end