module Rbgo::CoRun
Constants
- IS_CORUN_FIBER
- LOCAL_TASK_QUEUES
- YIELD_IO_OPERATION
Public Class Methods
accept_from(sock)
click to toggle source
# File lib/rbgo/corun.rb, line 72 def self.accept_from(sock) if is_in_corun_fiber? receipt = Scheduler.instance.io_machine.do_socket_accept(sock) Fiber.yield [YIELD_IO_OPERATION, receipt] else sock.accept end end
connect_to(sock, remote_sockaddr:)
click to toggle source
# File lib/rbgo/corun.rb, line 81 def self.connect_to(sock, remote_sockaddr:) if is_in_corun_fiber? receipt = Scheduler.instance.io_machine.do_socket_connect(sock, remote_sockaddr: remote_sockaddr) Fiber.yield [YIELD_IO_OPERATION, receipt] else sock.connect(remote_sockaddr) end end
have_other_task_on_thread?()
click to toggle source
# File lib/rbgo/corun.rb, line 20 def self.have_other_task_on_thread? queues = Thread.current.thread_variable_get(LOCAL_TASK_QUEUES) queues&.any? { |q| !q.empty? } end
is_in_corun_fiber?()
click to toggle source
# File lib/rbgo/corun.rb, line 16 def self.is_in_corun_fiber? !!Thread.current[IS_CORUN_FIBER] end
read_from(io, length: nil)
click to toggle source
# File lib/rbgo/corun.rb, line 42 def self.read_from(io, length: nil) if is_in_corun_fiber? return "" if length == 0 receipt = Scheduler.instance.io_machine.do_read(io, length: length) Fiber.yield [YIELD_IO_OPERATION, receipt] else io.read(length) end end
read_line_from(io, sep: $/, limit: nil)
click to toggle source
# File lib/rbgo/corun.rb, line 52 def self.read_line_from(io, sep: $/, limit: nil) if is_in_corun_fiber? return "" if limit == 0 receipt = Scheduler.instance.io_machine.do_read_line(io, sep: sep, limit: limit) Fiber.yield [YIELD_IO_OPERATION, receipt] else io.readline(sep, limit) end end
read_partial_from(io, maxlen:)
click to toggle source
# File lib/rbgo/corun.rb, line 62 def self.read_partial_from(io, maxlen:) if is_in_corun_fiber? return "" if maxlen == 0 receipt = Scheduler.instance.io_machine.do_read_partial(io, maxlen: maxlen) Fiber.yield [YIELD_IO_OPERATION, receipt] else io.readpartial(maxlen) end end
recv_from(sock, maxlen:, flags: 0)
click to toggle source
# File lib/rbgo/corun.rb, line 90 def self.recv_from(sock, maxlen:, flags: 0) if is_in_corun_fiber? receipt = Scheduler.instance.io_machine.do_socket_recv(sock, maxlen: maxlen, flags: flags) Fiber.yield [YIELD_IO_OPERATION, receipt] else sock.recv(maxlen, flags) end end
recvmsg_from(sock, maxdatalen: nil, flags: 0, maxcontrollen: nil, opts: {})
click to toggle source
# File lib/rbgo/corun.rb, line 99 def self.recvmsg_from(sock, maxdatalen: nil, flags: 0, maxcontrollen: nil, opts: {}) if is_in_corun_fiber? receipt = Scheduler.instance.io_machine.do_socket_recvmsg(sock, maxdatalen: maxdatalen, flags: flags, maxcontrollen: maxcontrollen, opts: opts) Fiber.yield [YIELD_IO_OPERATION, receipt] else sock.recvmsg(maxdatalen, flags, maxcontrollen, opts) end end
sendmsg_to(sock, mesg, flags: 0, dest_sockaddr: nil, controls: [])
click to toggle source
# File lib/rbgo/corun.rb, line 108 def self.sendmsg_to(sock, mesg, flags: 0, dest_sockaddr: nil, controls: []) if is_in_corun_fiber? receipt = Scheduler.instance.io_machine.do_socket_sendmsg(sock, mesg, flags: flags, dest_sockaddr: dest_sockaddr, controls: controls) Fiber.yield [YIELD_IO_OPERATION, receipt] else sock.sendmsg(mesg, flags, dest_sockaddr, *controls) end end
write_to(io, str:)
click to toggle source
# File lib/rbgo/corun.rb, line 117 def self.write_to(io, str:) if is_in_corun_fiber? receipt = Scheduler.instance.io_machine.do_write(io, str: str) Fiber.yield [YIELD_IO_OPERATION, receipt] else io.write(str) end end
yield_io(&blk)
click to toggle source
# File lib/rbgo/corun.rb, line 25 def self.yield_io(&blk) if is_in_corun_fiber? receipt = IOReceipt.new([:yield_io]) CoRun::Routine.new(new_thread: true, queue_tag: :none) do begin res = blk&.call receipt.res = res ensure receipt.notify end end Fiber.yield [YIELD_IO_OPERATION, receipt] else blk&.call end end