class Channel

Public Class Methods

new(size = nil) click to toggle source
# File lib/channel.rb, line 4
def initialize(size = nil)
  @q = size ? SizedQueue.new(size) : Queue.new
  @closed = false
  @mutex = Mutex.new
  @waiting = []
end
select(*channels) { |recv| ... } click to toggle source
# File lib/channel.rb, line 95
def select(*channels)
  selector = new
  threads = channels.map do |c|
    Thread.new { selector << [c.recv, c] }
  end
  yield selector.recv
ensure
  selector.close
  threads.each(&:kill).each(&:join)
end

Public Instance Methods

<<(val)
Alias for: push
close() click to toggle source
# File lib/channel.rb, line 54
def close
  lock! do
    return if closed?
    @closed = true
    all!
  end
end
closed?() click to toggle source
# File lib/channel.rb, line 62
def closed?
  @closed
end
each() { |e| ... } click to toggle source
# File lib/channel.rb, line 70
def each
  return enum_for(:each) unless block_given?

  loop do
    begin
      e = recv
    rescue Channel::Closed
      return
    else
      yield e
    end
  end
end
pop()
Alias for: recv
push(val)
Also aliased as: <<
Alias for: send
r!()
Alias for: receive_only!
receive_only!() click to toggle source
# File lib/channel.rb, line 84
def receive_only!
  ReceiveOnly.new(self)
end
Also aliased as: r!
recv() click to toggle source
# File lib/channel.rb, line 33
def recv
  lock! do
    loop do
      closed! if closed? && @q.empty?
      wait! && next if @q.empty?
      break @q.pop
    end
  end
end
Also aliased as: pop
s!()
Alias for: send_only!
send(val) click to toggle source
# File lib/channel.rb, line 44
def send(val)
  lock! do
    fail Closed if closed?
    @q << val
    next!
  end
end
Also aliased as: push
send_only!() click to toggle source
# File lib/channel.rb, line 89
def send_only!
  SendOnly.new(self)
end
Also aliased as: s!

Private Instance Methods

all!() click to toggle source
# File lib/channel.rb, line 29
        def all!
  @waiting.dup.each { next! }
end
closed!() click to toggle source
# File lib/channel.rb, line 66
        def closed!
  fail Closed
end
lock!(&block) click to toggle source
# File lib/channel.rb, line 11
        def lock!(&block)
  @mutex.synchronize(&block)
end
next!() click to toggle source
# File lib/channel.rb, line 20
        def next!
  loop do
    thr = @waiting.shift
    break if thr.nil?
    next unless thr.alive?
    break thr.wakeup
  end
end
wait!() click to toggle source
# File lib/channel.rb, line 15
        def wait!
  @waiting << Thread.current
  @mutex.sleep
end