module Rbgo::Channel
Public Class Methods
on_read(chan:, &blk)
click to toggle source
# File lib/rbgo/select_chan.rb, line 400 def on_read(chan:, &blk) raise ArgumentError.new('chan must be a Chan') unless chan.is_a? Chan op = Proc.new do res, ok = chan.deq(true) if blk.nil? [res, ok] else blk.call(res, ok) end end op.define_singleton_method(:register) do |io_w| chan.send :register, io_w end op.define_singleton_method(:unregister) do|io_w| chan.send :unregister, io_w end op end
on_write(chan:, obj:, &blk)
click to toggle source
# File lib/rbgo/select_chan.rb, line 425 def on_write(chan:, obj:, &blk) raise ArgumentError.new('chan must be a Chan') unless chan.is_a? Chan op = Proc.new do res = chan.enq(obj, true) res = blk.call unless blk.nil? res end op.define_singleton_method(:register) do |io_w| chan.send :register, io_w end op.define_singleton_method(:unregister) do|io_w| chan.send :unregister, io_w end op end
select_chan(*ops) { || ... }
click to toggle source
# File lib/rbgo/select_chan.rb, line 347 def select_chan(*ops) ops.shuffle! io_hash = {} close_io_blk = proc do io_hash.each_pair do |io_r, (op, io_w)| io_r.close rescue nil io_w.close rescue nil op.unregister(io_w) end end begin while true do close_io_blk.call io_hash.clear ops.each do |op| io_r, io_w = IO.pipe op.register(io_w) io_hash[io_r] = [op, io_w] end ops.each do |op| begin return op.call rescue ThreadError end end return yield if block_given? read_ios = IO.select(io_hash.keys).first rescue [] read_ios.each do |io_r| op = io_hash[io_r].first begin return op.call rescue ThreadError end end end ensure close_io_blk.call end end
Private Instance Methods
on_read(chan:, &blk)
click to toggle source
# File lib/rbgo/select_chan.rb, line 400 def on_read(chan:, &blk) raise ArgumentError.new('chan must be a Chan') unless chan.is_a? Chan op = Proc.new do res, ok = chan.deq(true) if blk.nil? [res, ok] else blk.call(res, ok) end end op.define_singleton_method(:register) do |io_w| chan.send :register, io_w end op.define_singleton_method(:unregister) do|io_w| chan.send :unregister, io_w end op end
on_write(chan:, obj:, &blk)
click to toggle source
# File lib/rbgo/select_chan.rb, line 425 def on_write(chan:, obj:, &blk) raise ArgumentError.new('chan must be a Chan') unless chan.is_a? Chan op = Proc.new do res = chan.enq(obj, true) res = blk.call unless blk.nil? res end op.define_singleton_method(:register) do |io_w| chan.send :register, io_w end op.define_singleton_method(:unregister) do|io_w| chan.send :unregister, io_w end op end
select_chan(*ops) { || ... }
click to toggle source
# File lib/rbgo/select_chan.rb, line 347 def select_chan(*ops) ops.shuffle! io_hash = {} close_io_blk = proc do io_hash.each_pair do |io_r, (op, io_w)| io_r.close rescue nil io_w.close rescue nil op.unregister(io_w) end end begin while true do close_io_blk.call io_hash.clear ops.each do |op| io_r, io_w = IO.pipe op.register(io_w) io_hash[io_r] = [op, io_w] end ops.each do |op| begin return op.call rescue ThreadError end end return yield if block_given? read_ios = IO.select(io_hash.keys).first rescue [] read_ios.each do |io_r| op = io_hash[io_r].first begin return op.call rescue ThreadError end end end ensure close_io_blk.call end end