class Rbgo::IOMachine
Attributes
actor[RW]
monitors[RW]
selector[RW]
Public Class Methods
new()
click to toggle source
# File lib/rbgo/io_machine.rb, line 83 def initialize self.selector = NIO::Selector.new self.monitors = {} self.actor = Actor.new do |msg, actor| if msg == :do_select handle_select_msg(actor) next end receipt = msg op = receipt.registered_op case op[0] when :register_read handle_read_msg(receipt, actor) when :register_read_line handle_read_line_msg(receipt, actor) when :register_read_partial handle_read_partial_msg(receipt, actor) when :register_write handle_write_msg(receipt, actor) when :register_socket_accept handle_socket_accept_msg(receipt, actor) when :register_socket_connect handle_socket_connect_msg(receipt, actor) when :register_socket_recv handle_socket_recv_msg(receipt, actor) when :register_socket_sendmsg handle_socket_sendmsg_msg(receipt, actor) when :register_socket_recvmsg handle_socket_recvmsg_msg(receipt, actor) end end #end of actor end
Public Instance Methods
close()
click to toggle source
# File lib/rbgo/io_machine.rb, line 70 def close actor.close selector.close end
closed?()
click to toggle source
# File lib/rbgo/io_machine.rb, line 75 def closed? actor.closed? && selector.closed? end
do_read(io, length: nil)
click to toggle source
# File lib/rbgo/io_machine.rb, line 7 def do_read(io, length: nil) op = [:register_read, io, length] receipt = IOReceipt.new(op) actor.send_msg(receipt) receipt end
do_read_line(io, sep: $/, limit: nil)
click to toggle source
# File lib/rbgo/io_machine.rb, line 21 def do_read_line(io, sep: $/, limit: nil) op = [:register_read_line, io, sep, limit] receipt = IOReceipt.new(op) actor.send_msg(receipt) receipt end
do_read_partial(io, maxlen:)
click to toggle source
# File lib/rbgo/io_machine.rb, line 28 def do_read_partial(io, maxlen:) op = [:register_read_partial, io, maxlen] receipt = IOReceipt.new(op) actor.send_msg(receipt) receipt end
do_socket_accept(sock)
click to toggle source
# File lib/rbgo/io_machine.rb, line 35 def do_socket_accept(sock) op = [:register_socket_accept, sock] receipt = IOReceipt.new(op) actor.send_msg(receipt) receipt end
do_socket_connect(sock, remote_sockaddr:)
click to toggle source
# File lib/rbgo/io_machine.rb, line 42 def do_socket_connect(sock, remote_sockaddr:) op = [:register_socket_connect, sock, remote_sockaddr] receipt = IOReceipt.new(op) actor.send_msg(receipt) receipt end
do_socket_recv(sock, maxlen:, flags: 0)
click to toggle source
# File lib/rbgo/io_machine.rb, line 49 def do_socket_recv(sock, maxlen:, flags: 0) op = [:register_socket_recv, sock, maxlen, flags] receipt = IOReceipt.new(op) actor.send_msg(receipt) receipt end
do_socket_recvmsg(sock, maxdatalen: nil, flags: 0, maxcontrollen: nil, opts: {})
click to toggle source
# File lib/rbgo/io_machine.rb, line 56 def do_socket_recvmsg(sock, maxdatalen: nil, flags: 0, maxcontrollen: nil, opts: {}) op = [:register_socket_recvmsg, sock, maxdatalen, flags, maxcontrollen, opts] receipt = IOReceipt.new(op) actor.send_msg(receipt) receipt end
do_socket_sendmsg(sock, mesg, flags: 0, dest_sockaddr: nil, controls: [])
click to toggle source
# File lib/rbgo/io_machine.rb, line 63 def do_socket_sendmsg(sock, mesg, flags: 0, dest_sockaddr: nil, controls: []) op = [:register_socket_sendmsg, sock, mesg, flags, dest_sockaddr, controls] receipt = IOReceipt.new(op) actor.send_msg(receipt) receipt end
do_write(io, str:)
click to toggle source
# File lib/rbgo/io_machine.rb, line 14 def do_write(io, str:) op = [:register_write, io, str] receipt = IOReceipt.new(op) actor.send_msg(receipt) receipt end
Private Instance Methods
handle_read_line_msg(receipt, actor)
click to toggle source
# File lib/rbgo/io_machine.rb, line 410 def handle_read_line_msg(receipt, actor) op = receipt.registered_op io = op[1] sep = op[2] limit = op[3] buf_size = 512 * 1024 res = "" monitor = register(receipt, interest: :r) return if monitor.nil? monitor.value ||= [] monitor.value[0] = proc do notify_blk = proc do monitors.delete(monitor.io) monitor.close if limit && limit > 0 && res.length == 0 receipt.res = nil else receipt.res = res end receipt.notify end sep = "\n\n" if (sep && sep.length == 0) if limit.nil? buf_size = 1 unless sep.nil? || io.is_a?(BasicSocket) loop do begin if sep && io.is_a?(BasicSocket) buf = io.recv_nonblock(buf_size, Socket::MSG_PEEK, exception: false) if buf != :wait_readable && buf.size == 0 buf = nil end else buf = io.read_nonblock(buf_size, exception: false) end rescue Exception => ex notify_blk.call Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" } break end if buf == :wait_readable break elsif buf.nil? notify_blk.call break end if sep && io.is_a?(BasicSocket) sep_index = buf.index(sep) if sep_index buf = buf[0...sep_index + sep.length] end res << buf io.recv(buf.size) else res << buf end unless sep.nil? if res.end_with?(sep) notify_blk.call break end end end elsif limit > 0 bytes_read_n = 0 loop do need_read_bytes_n = limit - bytes_read_n if need_read_bytes_n <= 0 notify_blk.call break end if sep.nil? || io.is_a?(BasicSocket) buf_size = need_read_bytes_n else buf_size = 1 end begin if sep && io.is_a?(BasicSocket) buf = io.recv_nonblock(buf_size, Socket::MSG_PEEK, exception: false) if buf != :wait_readable && buf.size == 0 buf = nil end else buf = io.read_nonblock(buf_size, exception: false) end rescue Exception => ex notify_blk.call Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" } break end if buf == :wait_readable break elsif buf.nil? notify_blk.call break end if sep && io.is_a?(BasicSocket) sep_index = buf.index(sep) if sep_index buf = buf[0...sep_index + sep.length] end res << buf io.recv(buf.size) else res << buf end bytes_read_n += buf.bytesize unless sep.nil? if res.end_with?(sep) notify_blk.call break end end end else notify_blk.call end end actor.send_msg :do_select end
handle_read_msg(receipt, actor)
click to toggle source
# File lib/rbgo/io_machine.rb, line 534 def handle_read_msg(receipt, actor) op = receipt.registered_op io = op[1] len = op[2] res = "" buf_size = 1024 * 512 monitor = register(receipt, interest: :r) return if monitor.nil? notify_blk = proc do monitors.delete(monitor.io) monitor.close if len && len > 0 && res.length == 0 receipt.res = nil else receipt.res = res end receipt.notify end monitor.value ||= [] monitor.value[0] = proc do if len.nil? loop do begin buf = io.read_nonblock(buf_size, exception: false) rescue Exception => ex notify_blk.call Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" } break end if buf == :wait_readable break elsif buf.nil? notify_blk.call break end res << buf end elsif len == 0 notify_blk.call break else bytes_read_n = 0 loop do need_read_bytes_n = len - bytes_read_n if need_read_bytes_n <= 0 notify_blk.call break end begin buf = io.read_nonblock(need_read_bytes_n, exception: false) rescue Exception => ex notify_blk.call Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" } break end if buf == :wait_readable break elsif buf.nil? notify_blk.call break end res << buf bytes_read_n += buf.bytesize end end end actor.send_msg :do_select end
handle_read_partial_msg(receipt, actor)
click to toggle source
# File lib/rbgo/io_machine.rb, line 372 def handle_read_partial_msg(receipt, actor) op = receipt.registered_op io = op[1] maxlen = op[2] res = "" monitor = register(receipt, interest: :r) return if monitor.nil? monitor.value ||= [] monitor.value[0] = proc do notify_blk = proc do monitors.delete(monitor.io) monitor.close receipt.res = res receipt.notify end catch :exit do begin buf = io.read_nonblock(maxlen, exception: false) rescue Exception => ex notify_blk.call Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" } throw :exit end if buf == :wait_readable throw :exit elsif buf.nil? notify_blk.call throw :exit end res << buf notify_blk.call end end actor.send_msg :do_select end
handle_select_msg(actor)
click to toggle source
end of initialize
# File lib/rbgo/io_machine.rb, line 123 def handle_select_msg(actor) return if selector.empty? selector.select(0.1) do |monitor| if monitor.readiness == :r monitor.value[0].call elsif monitor.readiness == :w monitor.value[1].call else monitor.value[0].call monitor.value[1].call end end return if selector.empty? actor.send_msg :do_select nil end
handle_socket_accept_msg(receipt, actor)
click to toggle source
# File lib/rbgo/io_machine.rb, line 339 def handle_socket_accept_msg(receipt, actor) op = receipt.registered_op sock = op[1] res = nil monitor = register(receipt, interest: :r) return if monitor.nil? monitor.value ||= [] monitor.value[0] = proc do notify_blk = proc do monitors.delete(monitor.io) monitor.close receipt.res = res receipt.notify end catch :exit do begin res = sock.accept_nonblock(exception: false) rescue Exception => ex notify_blk.call Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" } throw :exit end if res == :wait_readable throw :exit end notify_blk.call end end actor.send_msg :do_select end
handle_socket_connect_msg(receipt, actor)
click to toggle source
# File lib/rbgo/io_machine.rb, line 304 def handle_socket_connect_msg(receipt, actor) op = receipt.registered_op sock = op[1] remote_sockaddr = op[2] res = nil monitor = register(receipt, interest: :w) return if monitor.nil? monitor.value ||= [] monitor.value[1] = proc do notify_blk = proc do monitors.delete(monitor.io) monitor.close receipt.res = res receipt.notify end catch :exit do begin res = sock.connect_nonblock(remote_sockaddr, exception: false) rescue Exception => ex notify_blk.call Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" } throw :exit end if res == :wait_writable throw :exit end notify_blk.call end end monitor.value[1].call actor.send_msg :do_select end
handle_socket_recv_msg(receipt, actor)
click to toggle source
# File lib/rbgo/io_machine.rb, line 140 def handle_socket_recv_msg(receipt, actor) op = receipt.registered_op io = op[1] maxlen = op[2] flags = op[3] res = "" monitor = register(receipt, interest: :r) return if monitor.nil? monitor.value ||= [] monitor.value[0] = proc do notify_blk = proc do monitors.delete(monitor.io) monitor.close receipt.res = res receipt.notify end catch :exit do begin buf = io.recv_nonblock(maxlen, flags, exception: false) rescue Exception => ex notify_blk.call Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" } throw :exit end if buf == :wait_readable throw :exit elsif buf.nil? notify_blk.call throw :exit end res << buf notify_blk.call end end actor.send_msg :do_select end
handle_socket_recvmsg_msg(receipt, actor)
click to toggle source
# File lib/rbgo/io_machine.rb, line 179 def handle_socket_recvmsg_msg(receipt, actor) op = receipt.registered_op sock = op[1] len = op[2] flags = op[3] maxcontrollen = op[4] opts = op[5] data = "" sender_addrinfo = nil rflags = 0 controls = [] monitor = register(receipt, interest: :r) return if monitor.nil? notify_blk = proc do monitors.delete(monitor.io) monitor.close if len && len > 0 && data.length == 0 receipt.res = nil else receipt.res = [data, sender_addrinfo, rflags, *controls] end receipt.notify end monitor.value ||= [] monitor.value[0] = proc do if len.nil? loop do begin buf = sock.recvmsg_nonblock(nil, flags, maxcontrollen, opts.merge(exception: false)) rescue Exception => ex notify_blk.call Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" } break end if buf == :wait_readable break elsif buf.nil? notify_blk.call break end data << buf[0] sender_addrinfo = buf[1] rflags = buf[2] controls.append(*buf[3..-1]) end elsif len == 0 notify_blk.call break else bytes_read_n = 0 loop do need_read_bytes_n = len - bytes_read_n if need_read_bytes_n <= 0 notify_blk.call break end begin buf = sock.recvmsg_nonblock(need_read_bytes_n, flags, maxcontrollen, opts.merge(exception: false)) rescue Exception => ex notify_blk.call Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" } break end if buf == :wait_readable break elsif buf.nil? notify_blk.call break end data << buf[0] sender_addrinfo = buf[1] rflags = buf[2] controls.append(*buf[3..-1]) bytes_read_n += buf[0].bytesize end end end actor.send_msg :do_select end
handle_socket_sendmsg_msg(receipt, actor)
click to toggle source
# File lib/rbgo/io_machine.rb, line 261 def handle_socket_sendmsg_msg(receipt, actor) op = receipt.registered_op sock = op[1] str = op[2].to_s flags = op[3] dest_sockaddr = op[4] controls = op[5] monitor = register(receipt, interest: :w) return if monitor.nil? bytes_written_n = 0 monitor.value ||= [] monitor.value[1] = proc do loop do begin bytes_need_to_write_n = str.size - bytes_written_n if bytes_need_to_write_n > 0 n = sock.sendmsg_nonblock(str[bytes_written_n..-1], flags, dest_sockaddr, *controls, exception: false) else monitors.delete(monitor.io) monitor.close receipt.res = str.bytesize receipt.notify break end rescue Exception => ex monitors.delete(monitor.io) monitor.close receipt.res = bytes_written_n receipt.notify Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" } break end if n == :wait_writable break end bytes_written_n += n end end monitor.value[1].call actor.send_msg :do_select end
handle_write_msg(receipt, actor)
click to toggle source
# File lib/rbgo/io_machine.rb, line 604 def handle_write_msg(receipt, actor) op = receipt.registered_op io = op[1] buf = op[2].to_s monitor = register(receipt, interest: :w) return if monitor.nil? bytes_written_n = 0 monitor.value ||= [] monitor.value[1] = proc do begin if buf.size - bytes_written_n > 0 n = io.write_nonblock(buf[bytes_written_n..-1], exception: false) if n.is_a? Numeric bytes_written_n += n end else monitors.delete(monitor.io) monitor.close receipt.res = buf.bytesize receipt.notify end rescue Exception => ex monitors.delete(monitor.io) monitor.close receipt.res = bytes_written_n receipt.notify Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" } end end monitor.value[1].call actor.send_msg :do_select end
register(receipt, interest:)
click to toggle source
# File lib/rbgo/io_machine.rb, line 639 def register(receipt, interest:) io = receipt.registered_op[1] registered_monitor = monitors[io] if io.closed? monitors.delete(io) registered_monitor.close if registered_monitor receipt.res = nil receipt.notify return nil end if registered_monitor && (registered_monitor.interests == interest || registered_monitor.interests == :rw) actor.send_msg receipt return nil end if registered_monitor registered_monitor.add_interest(interest) monitor = registered_monitor else monitor = selector.register(io, interest) monitors[io] = monitor end monitor end