class Rbgo::IOMachine

Attributes

actor[RW]
monitors[RW]
selector[RW]

Public Class Methods

new() click to toggle source
# File lib/rbgo/io_machine.rb, line 83
def initialize
  self.selector = NIO::Selector.new
  self.monitors = {}

  self.actor = Actor.new do |msg, actor|
    if msg == :do_select
      handle_select_msg(actor)
      next
    end

    receipt = msg
    op      = receipt.registered_op

    case op[0]
    when :register_read
      handle_read_msg(receipt, actor)
    when :register_read_line
      handle_read_line_msg(receipt, actor)
    when :register_read_partial
      handle_read_partial_msg(receipt, actor)
    when :register_write
      handle_write_msg(receipt, actor)
    when :register_socket_accept
      handle_socket_accept_msg(receipt, actor)
    when :register_socket_connect
      handle_socket_connect_msg(receipt, actor)
    when :register_socket_recv
      handle_socket_recv_msg(receipt, actor)
    when :register_socket_sendmsg
      handle_socket_sendmsg_msg(receipt, actor)
    when :register_socket_recvmsg
      handle_socket_recvmsg_msg(receipt, actor)
    end
  end #end of actor

end

Public Instance Methods

close() click to toggle source
# File lib/rbgo/io_machine.rb, line 70
def close
  actor.close
  selector.close
end
closed?() click to toggle source
# File lib/rbgo/io_machine.rb, line 75
def closed?
  actor.closed? && selector.closed?
end
do_read(io, length: nil) click to toggle source
# File lib/rbgo/io_machine.rb, line 7
def do_read(io, length: nil)
  op      = [:register_read, io, length]
  receipt = IOReceipt.new(op)
  actor.send_msg(receipt)
  receipt
end
do_read_line(io, sep: $/, limit: nil) click to toggle source
# File lib/rbgo/io_machine.rb, line 21
def do_read_line(io, sep: $/, limit: nil)
  op      = [:register_read_line, io, sep, limit]
  receipt = IOReceipt.new(op)
  actor.send_msg(receipt)
  receipt
end
do_read_partial(io, maxlen:) click to toggle source
# File lib/rbgo/io_machine.rb, line 28
def do_read_partial(io, maxlen:)
  op      = [:register_read_partial, io, maxlen]
  receipt = IOReceipt.new(op)
  actor.send_msg(receipt)
  receipt
end
do_socket_accept(sock) click to toggle source
# File lib/rbgo/io_machine.rb, line 35
def do_socket_accept(sock)
  op      = [:register_socket_accept, sock]
  receipt = IOReceipt.new(op)
  actor.send_msg(receipt)
  receipt
end
do_socket_connect(sock, remote_sockaddr:) click to toggle source
# File lib/rbgo/io_machine.rb, line 42
def do_socket_connect(sock, remote_sockaddr:)
  op      = [:register_socket_connect, sock, remote_sockaddr]
  receipt = IOReceipt.new(op)
  actor.send_msg(receipt)
  receipt
end
do_socket_recv(sock, maxlen:, flags: 0) click to toggle source
# File lib/rbgo/io_machine.rb, line 49
def do_socket_recv(sock, maxlen:, flags: 0)
  op      = [:register_socket_recv, sock, maxlen, flags]
  receipt = IOReceipt.new(op)
  actor.send_msg(receipt)
  receipt
end
do_socket_recvmsg(sock, maxdatalen: nil, flags: 0, maxcontrollen: nil, opts: {}) click to toggle source
# File lib/rbgo/io_machine.rb, line 56
def do_socket_recvmsg(sock, maxdatalen: nil, flags: 0, maxcontrollen: nil, opts: {})
  op      = [:register_socket_recvmsg, sock, maxdatalen, flags, maxcontrollen, opts]
  receipt = IOReceipt.new(op)
  actor.send_msg(receipt)
  receipt
end
do_socket_sendmsg(sock, mesg, flags: 0, dest_sockaddr: nil, controls: []) click to toggle source
# File lib/rbgo/io_machine.rb, line 63
def do_socket_sendmsg(sock, mesg, flags: 0, dest_sockaddr: nil, controls: [])
  op      = [:register_socket_sendmsg, sock, mesg, flags, dest_sockaddr, controls]
  receipt = IOReceipt.new(op)
  actor.send_msg(receipt)
  receipt
end
do_write(io, str:) click to toggle source
# File lib/rbgo/io_machine.rb, line 14
def do_write(io, str:)
  op      = [:register_write, io, str]
  receipt = IOReceipt.new(op)
  actor.send_msg(receipt)
  receipt
end

Private Instance Methods

handle_read_line_msg(receipt, actor) click to toggle source
# File lib/rbgo/io_machine.rb, line 410
def handle_read_line_msg(receipt, actor)
  op       = receipt.registered_op
  io       = op[1]
  sep      = op[2]
  limit    = op[3]
  buf_size = 512 * 1024
  res      = ""

  monitor = register(receipt, interest: :r)
  return if monitor.nil?

  monitor.value    ||= []
  monitor.value[0] = proc do
    notify_blk = proc do
      monitors.delete(monitor.io)
      monitor.close
      if limit && limit > 0 && res.length == 0
        receipt.res = nil
      else
        receipt.res = res
      end
      receipt.notify
    end

    sep        = "\n\n" if (sep && sep.length == 0)

    if limit.nil?
      buf_size = 1 unless sep.nil? || io.is_a?(BasicSocket)
      loop do
        begin
          if sep && io.is_a?(BasicSocket)
            buf = io.recv_nonblock(buf_size, Socket::MSG_PEEK, exception: false)
            if buf != :wait_readable && buf.size == 0
              buf = nil
            end
          else
            buf = io.read_nonblock(buf_size, exception: false)
          end
        rescue Exception => ex
          notify_blk.call
          Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" }
          break
        end
        if buf == :wait_readable
          break
        elsif buf.nil?
          notify_blk.call
          break
        end
        if sep && io.is_a?(BasicSocket)
          sep_index = buf.index(sep)
          if sep_index
            buf = buf[0...sep_index + sep.length]
          end
          res << buf
          io.recv(buf.size)
        else
          res << buf
        end
        unless sep.nil?
          if res.end_with?(sep)
            notify_blk.call
            break
          end
        end
      end
    elsif limit > 0
      bytes_read_n = 0
      loop do
        need_read_bytes_n = limit - bytes_read_n
        if need_read_bytes_n <= 0
          notify_blk.call
          break
        end
        if sep.nil? || io.is_a?(BasicSocket)
          buf_size = need_read_bytes_n
        else
          buf_size = 1
        end
        begin
          if sep && io.is_a?(BasicSocket)
            buf = io.recv_nonblock(buf_size, Socket::MSG_PEEK, exception: false)
            if buf != :wait_readable && buf.size == 0
              buf = nil
            end
          else
            buf = io.read_nonblock(buf_size, exception: false)
          end
        rescue Exception => ex
          notify_blk.call
          Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" }
          break
        end
        if buf == :wait_readable
          break
        elsif buf.nil?
          notify_blk.call
          break
        end
        if sep && io.is_a?(BasicSocket)
          sep_index = buf.index(sep)
          if sep_index
            buf = buf[0...sep_index + sep.length]
          end
          res << buf
          io.recv(buf.size)
        else
          res << buf
        end
        bytes_read_n += buf.bytesize
        unless sep.nil?
          if res.end_with?(sep)
            notify_blk.call
            break
          end
        end
      end
    else
      notify_blk.call
    end
  end
  actor.send_msg :do_select
end
handle_read_msg(receipt, actor) click to toggle source
# File lib/rbgo/io_machine.rb, line 534
def handle_read_msg(receipt, actor)
  op       = receipt.registered_op
  io       = op[1]
  len      = op[2]
  res      = ""
  buf_size = 1024 * 512

  monitor = register(receipt, interest: :r)
  return if monitor.nil?
  notify_blk       = proc do
    monitors.delete(monitor.io)
    monitor.close
    if len && len > 0 && res.length == 0
      receipt.res = nil
    else
      receipt.res = res
    end
    receipt.notify
  end
  monitor.value    ||= []
  monitor.value[0] = proc do
    if len.nil?
      loop do
        begin
          buf = io.read_nonblock(buf_size, exception: false)
        rescue Exception => ex
          notify_blk.call
          Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" }
          break
        end
        if buf == :wait_readable
          break
        elsif buf.nil?
          notify_blk.call
          break
        end
        res << buf
      end
    elsif len == 0
      notify_blk.call
      break
    else
      bytes_read_n = 0
      loop do
        need_read_bytes_n = len - bytes_read_n
        if need_read_bytes_n <= 0
          notify_blk.call
          break
        end
        begin
          buf = io.read_nonblock(need_read_bytes_n, exception: false)
        rescue Exception => ex
          notify_blk.call
          Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" }
          break
        end
        if buf == :wait_readable
          break
        elsif buf.nil?
          notify_blk.call
          break
        end
        res << buf
        bytes_read_n += buf.bytesize
      end
    end
  end
  actor.send_msg :do_select
end
handle_read_partial_msg(receipt, actor) click to toggle source
# File lib/rbgo/io_machine.rb, line 372
def handle_read_partial_msg(receipt, actor)
  op     = receipt.registered_op
  io     = op[1]
  maxlen = op[2]
  res    = ""

  monitor = register(receipt, interest: :r)
  return if monitor.nil?

  monitor.value    ||= []
  monitor.value[0] = proc do
    notify_blk = proc do
      monitors.delete(monitor.io)
      monitor.close
      receipt.res = res
      receipt.notify
    end
    catch :exit do
      begin
        buf = io.read_nonblock(maxlen, exception: false)
      rescue Exception => ex
        notify_blk.call
        Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" }
        throw :exit
      end
      if buf == :wait_readable
        throw :exit
      elsif buf.nil?
        notify_blk.call
        throw :exit
      end
      res << buf
      notify_blk.call
    end
  end
  actor.send_msg :do_select
end
handle_select_msg(actor) click to toggle source

end of initialize

# File lib/rbgo/io_machine.rb, line 123
def handle_select_msg(actor)
  return if selector.empty?
  selector.select(0.1) do |monitor|
    if monitor.readiness == :r
      monitor.value[0].call
    elsif monitor.readiness == :w
      monitor.value[1].call
    else
      monitor.value[0].call
      monitor.value[1].call
    end
  end
  return if selector.empty?
  actor.send_msg :do_select
  nil
end
handle_socket_accept_msg(receipt, actor) click to toggle source
# File lib/rbgo/io_machine.rb, line 339
def handle_socket_accept_msg(receipt, actor)
  op   = receipt.registered_op
  sock = op[1]
  res  = nil

  monitor = register(receipt, interest: :r)
  return if monitor.nil?

  monitor.value    ||= []
  monitor.value[0] = proc do
    notify_blk = proc do
      monitors.delete(monitor.io)
      monitor.close
      receipt.res = res
      receipt.notify
    end
    catch :exit do
      begin
        res = sock.accept_nonblock(exception: false)
      rescue Exception => ex
        notify_blk.call
        Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" }
        throw :exit
      end
      if res == :wait_readable
        throw :exit
      end
      notify_blk.call
    end
  end
  actor.send_msg :do_select
end
handle_socket_connect_msg(receipt, actor) click to toggle source
# File lib/rbgo/io_machine.rb, line 304
def handle_socket_connect_msg(receipt, actor)
  op              = receipt.registered_op
  sock            = op[1]
  remote_sockaddr = op[2]
  res             = nil

  monitor = register(receipt, interest: :w)
  return if monitor.nil?

  monitor.value    ||= []
  monitor.value[1] = proc do
    notify_blk = proc do
      monitors.delete(monitor.io)
      monitor.close
      receipt.res = res
      receipt.notify
    end
    catch :exit do
      begin
        res = sock.connect_nonblock(remote_sockaddr, exception: false)
      rescue Exception => ex
        notify_blk.call
        Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" }
        throw :exit
      end
      if res == :wait_writable
        throw :exit
      end
      notify_blk.call
    end
  end
  monitor.value[1].call
  actor.send_msg :do_select
end
handle_socket_recv_msg(receipt, actor) click to toggle source
# File lib/rbgo/io_machine.rb, line 140
def handle_socket_recv_msg(receipt, actor)
  op     = receipt.registered_op
  io     = op[1]
  maxlen = op[2]
  flags  = op[3]
  res    = ""

  monitor = register(receipt, interest: :r)
  return if monitor.nil?

  monitor.value    ||= []
  monitor.value[0] = proc do
    notify_blk = proc do
      monitors.delete(monitor.io)
      monitor.close
      receipt.res = res
      receipt.notify
    end
    catch :exit do
      begin
        buf = io.recv_nonblock(maxlen, flags, exception: false)
      rescue Exception => ex
        notify_blk.call
        Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" }
        throw :exit
      end
      if buf == :wait_readable
        throw :exit
      elsif buf.nil?
        notify_blk.call
        throw :exit
      end
      res << buf
      notify_blk.call
    end
  end
  actor.send_msg :do_select
end
handle_socket_recvmsg_msg(receipt, actor) click to toggle source
# File lib/rbgo/io_machine.rb, line 179
def handle_socket_recvmsg_msg(receipt, actor)
  op              = receipt.registered_op
  sock            = op[1]
  len             = op[2]
  flags           = op[3]
  maxcontrollen   = op[4]
  opts            = op[5]

  data            = ""
  sender_addrinfo = nil
  rflags          = 0
  controls        = []

  monitor = register(receipt, interest: :r)
  return if monitor.nil?
  notify_blk       = proc do
    monitors.delete(monitor.io)
    monitor.close
    if len && len > 0 && data.length == 0
      receipt.res = nil
    else
      receipt.res = [data, sender_addrinfo, rflags, *controls]
    end
    receipt.notify
  end
  monitor.value    ||= []
  monitor.value[0] = proc do
    if len.nil?
      loop do
        begin
          buf = sock.recvmsg_nonblock(nil, flags, maxcontrollen, opts.merge(exception: false))
        rescue Exception => ex
          notify_blk.call
          Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" }
          break
        end
        if buf == :wait_readable
          break
        elsif buf.nil?
          notify_blk.call
          break
        end
        data << buf[0]
        sender_addrinfo = buf[1]
        rflags          = buf[2]
        controls.append(*buf[3..-1])
      end
    elsif len == 0
      notify_blk.call
      break
    else
      bytes_read_n = 0
      loop do
        need_read_bytes_n = len - bytes_read_n
        if need_read_bytes_n <= 0
          notify_blk.call
          break
        end
        begin
          buf = sock.recvmsg_nonblock(need_read_bytes_n, flags, maxcontrollen, opts.merge(exception: false))
        rescue Exception => ex
          notify_blk.call
          Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" }
          break
        end
        if buf == :wait_readable
          break
        elsif buf.nil?
          notify_blk.call
          break
        end
        data << buf[0]
        sender_addrinfo = buf[1]
        rflags          = buf[2]
        controls.append(*buf[3..-1])
        bytes_read_n += buf[0].bytesize
      end
    end
  end
  actor.send_msg :do_select
end
handle_socket_sendmsg_msg(receipt, actor) click to toggle source
# File lib/rbgo/io_machine.rb, line 261
def handle_socket_sendmsg_msg(receipt, actor)
  op            = receipt.registered_op
  sock          = op[1]
  str           = op[2].to_s
  flags         = op[3]
  dest_sockaddr = op[4]
  controls      = op[5]
  monitor       = register(receipt, interest: :w)
  return if monitor.nil?

  bytes_written_n  = 0
  monitor.value    ||= []
  monitor.value[1] = proc do
    loop do
      begin
        bytes_need_to_write_n = str.size - bytes_written_n
        if bytes_need_to_write_n > 0
          n = sock.sendmsg_nonblock(str[bytes_written_n..-1], flags, dest_sockaddr, *controls, exception: false)
        else
          monitors.delete(monitor.io)
          monitor.close
          receipt.res = str.bytesize
          receipt.notify
          break
        end
      rescue Exception => ex
        monitors.delete(monitor.io)
        monitor.close
        receipt.res = bytes_written_n
        receipt.notify
        Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" }
        break
      end
      if n == :wait_writable
        break
      end
      bytes_written_n += n
    end
  end
  monitor.value[1].call
  actor.send_msg :do_select
end
handle_write_msg(receipt, actor) click to toggle source
# File lib/rbgo/io_machine.rb, line 604
def handle_write_msg(receipt, actor)
  op  = receipt.registered_op
  io  = op[1]
  buf = op[2].to_s

  monitor = register(receipt, interest: :w)
  return if monitor.nil?

  bytes_written_n  = 0
  monitor.value    ||= []
  monitor.value[1] = proc do
    begin
      if buf.size - bytes_written_n > 0
        n               = io.write_nonblock(buf[bytes_written_n..-1], exception: false)
        if n.is_a? Numeric
          bytes_written_n += n
        end
      else
        monitors.delete(monitor.io)
        monitor.close
        receipt.res = buf.bytesize
        receipt.notify
      end
    rescue Exception => ex
      monitors.delete(monitor.io)
      monitor.close
      receipt.res = bytes_written_n
      receipt.notify
      Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" }
    end
  end
  monitor.value[1].call
  actor.send_msg :do_select
end
register(receipt, interest:) click to toggle source
# File lib/rbgo/io_machine.rb, line 639
def register(receipt, interest:)
  io                 = receipt.registered_op[1]
  registered_monitor = monitors[io]

  if io.closed?
    monitors.delete(io)
    registered_monitor.close if registered_monitor
    receipt.res = nil
    receipt.notify
    return nil
  end

  if registered_monitor && (registered_monitor.interests == interest || registered_monitor.interests == :rw)
    actor.send_msg receipt
    return nil
  end

  if registered_monitor
    registered_monitor.add_interest(interest)
    monitor = registered_monitor
  else
    monitor      = selector.register(io, interest)
    monitors[io] = monitor
  end
  monitor
end