module Rbgo::Channel

Public Class Methods

on_read(chan:, &blk) click to toggle source

on_read

# File lib/rbgo/select_chan.rb, line 400
def on_read(chan:, &blk)
  raise ArgumentError.new('chan must be a Chan') unless chan.is_a? Chan
  op = Proc.new do
    res, ok = chan.deq(true)
    if blk.nil?
      [res, ok]
    else
      blk.call(res, ok)
    end
  end
  op.define_singleton_method(:register) do |io_w|
    chan.send :register, io_w
  end
  op.define_singleton_method(:unregister) do|io_w|
    chan.send :unregister, io_w
  end
  op
end
on_write(chan:, obj:, &blk) click to toggle source

on_write

# File lib/rbgo/select_chan.rb, line 425
def on_write(chan:, obj:, &blk)
  raise ArgumentError.new('chan must be a Chan') unless chan.is_a? Chan
  op = Proc.new do
    res = chan.enq(obj, true)
    res = blk.call unless blk.nil?
    res
  end
  op.define_singleton_method(:register) do |io_w|
    chan.send :register, io_w
  end
  op.define_singleton_method(:unregister) do|io_w|
    chan.send :unregister, io_w
  end
  op
end
select_chan(*ops) { || ... } click to toggle source

select_chan

# File lib/rbgo/select_chan.rb, line 347
def select_chan(*ops)
  ops.shuffle!

  io_hash      = {}
  close_io_blk = proc do
    io_hash.each_pair do |io_r, (op, io_w)|
      io_r.close rescue nil
      io_w.close rescue nil
      op.unregister(io_w)
    end
  end

  begin
    while true do

      close_io_blk.call
      io_hash.clear

      ops.each do |op|
        io_r, io_w = IO.pipe
        op.register(io_w)
        io_hash[io_r] = [op, io_w]
      end

      ops.each do |op|
        begin
          return op.call
        rescue ThreadError
        end
      end

      return yield if block_given?

      read_ios = IO.select(io_hash.keys).first rescue []

      read_ios.each do |io_r|
        op = io_hash[io_r].first
        begin
          return op.call
        rescue ThreadError
        end
      end
    end
  ensure
    close_io_blk.call
  end
end

Private Instance Methods

on_read(chan:, &blk) click to toggle source

on_read

# File lib/rbgo/select_chan.rb, line 400
def on_read(chan:, &blk)
  raise ArgumentError.new('chan must be a Chan') unless chan.is_a? Chan
  op = Proc.new do
    res, ok = chan.deq(true)
    if blk.nil?
      [res, ok]
    else
      blk.call(res, ok)
    end
  end
  op.define_singleton_method(:register) do |io_w|
    chan.send :register, io_w
  end
  op.define_singleton_method(:unregister) do|io_w|
    chan.send :unregister, io_w
  end
  op
end
on_write(chan:, obj:, &blk) click to toggle source

on_write

# File lib/rbgo/select_chan.rb, line 425
def on_write(chan:, obj:, &blk)
  raise ArgumentError.new('chan must be a Chan') unless chan.is_a? Chan
  op = Proc.new do
    res = chan.enq(obj, true)
    res = blk.call unless blk.nil?
    res
  end
  op.define_singleton_method(:register) do |io_w|
    chan.send :register, io_w
  end
  op.define_singleton_method(:unregister) do|io_w|
    chan.send :unregister, io_w
  end
  op
end
select_chan(*ops) { || ... } click to toggle source

select_chan

# File lib/rbgo/select_chan.rb, line 347
def select_chan(*ops)
  ops.shuffle!

  io_hash      = {}
  close_io_blk = proc do
    io_hash.each_pair do |io_r, (op, io_w)|
      io_r.close rescue nil
      io_w.close rescue nil
      op.unregister(io_w)
    end
  end

  begin
    while true do

      close_io_blk.call
      io_hash.clear

      ops.each do |op|
        io_r, io_w = IO.pipe
        op.register(io_w)
        io_hash[io_r] = [op, io_w]
      end

      ops.each do |op|
        begin
          return op.call
        rescue ThreadError
        end
      end

      return yield if block_given?

      read_ios = IO.select(io_hash.keys).first rescue []

      read_ios.each do |io_r|
        op = io_hash[io_r].first
        begin
          return op.call
        rescue ThreadError
        end
      end
    end
  ensure
    close_io_blk.call
  end
end