module Rbgo::Channel::Chan

Attributes

ios[RW]
register_mutex[RW]

Public Class Methods

after(seconds, &blk) click to toggle source
# File lib/rbgo/select_chan.rb, line 33
def self.after(seconds, &blk)
  ch = new
  CoRun::Routine.new(new_thread: true, queue_tag: :none) do
    begin
      sleep seconds
      v = blk.nil? ? Time.now : blk.call
      ch << v rescue nil
    ensure
      ch.close
    end
  end
  ch
end
new(max = 0) click to toggle source
# File lib/rbgo/select_chan.rb, line 24
def self.new(max = 0)
  max = max.to_i
  if max <= 0
    NonBufferChan.new
  else
    BufferChan.new(max)
  end
end
perform(timeout: nil, &blk) click to toggle source
# File lib/rbgo/select_chan.rb, line 67
def self.perform(timeout: nil, &blk)
  ch = new
  CoRun::Routine.new(new_thread: false, queue_tag: :default) do
    begin
      res = nil
      Timeout::timeout(timeout) do
        res = blk.call
      end
      ch << res rescue nil
    rescue Timeout::Error
    ensure
      ch.close
    end
  end
  ch
end
tick(every_seconds, &blk) click to toggle source
# File lib/rbgo/select_chan.rb, line 47
def self.tick(every_seconds, &blk)
  ch = new
  CoRun::Routine.new(new_thread: true, queue_tag: :none) do
    begin
      loop do
        break if ch.closed?
        sleep every_seconds
        v = blk.nil? ? Time.now : blk.call
        begin
          ch.enq(v, true)
        rescue ThreadError
        end
      end
    ensure
      ch.close
    end
  end
  ch
end

Public Instance Methods

each() { |obj| ... } click to toggle source
# File lib/rbgo/select_chan.rb, line 9
def each
  if block_given?
    loop do
      obj, ok = pop
      if ok
        yield obj
      else
        break
      end
    end
  else
    enum_for(:each)
  end
end

Private Instance Methods

notify() click to toggle source
# File lib/rbgo/select_chan.rb, line 101
def notify
  register_mutex.synchronize do
    ios.each do |io|
      io.close rescue nil
    end
    ios.clear
  end
  nil
end
register(io) click to toggle source
# File lib/rbgo/select_chan.rb, line 89
def register(io)
  register_mutex.synchronize do
    ios << io
  end
end
unregister(io) click to toggle source
# File lib/rbgo/select_chan.rb, line 95
def unregister(io)
  register_mutex.synchronize do
    ios.delete(io)
  end
end