class Channel
Public Class Methods
new(size = nil)
click to toggle source
# File lib/channel.rb, line 4 def initialize(size = nil) @q = size ? SizedQueue.new(size) : Queue.new @closed = false @mutex = Mutex.new @waiting = [] end
select(*channels) { |recv| ... }
click to toggle source
# File lib/channel.rb, line 95 def select(*channels) selector = new threads = channels.map do |c| Thread.new { selector << [c.recv, c] } end yield selector.recv ensure selector.close threads.each(&:kill).each(&:join) end
Public Instance Methods
close()
click to toggle source
# File lib/channel.rb, line 54 def close lock! do return if closed? @closed = true all! end end
closed?()
click to toggle source
# File lib/channel.rb, line 62 def closed? @closed end
each() { |e| ... }
click to toggle source
# File lib/channel.rb, line 70 def each return enum_for(:each) unless block_given? loop do begin e = recv rescue Channel::Closed return else yield e end end end
receive_only!()
click to toggle source
# File lib/channel.rb, line 84 def receive_only! ReceiveOnly.new(self) end
Also aliased as: r!
recv()
click to toggle source
# File lib/channel.rb, line 33 def recv lock! do loop do closed! if closed? && @q.empty? wait! && next if @q.empty? break @q.pop end end end
Also aliased as: pop
send(val)
click to toggle source
# File lib/channel.rb, line 44 def send(val) lock! do fail Closed if closed? @q << val next! end end
Also aliased as: push
send_only!()
click to toggle source
# File lib/channel.rb, line 89 def send_only! SendOnly.new(self) end
Also aliased as: s!
Private Instance Methods
all!()
click to toggle source
# File lib/channel.rb, line 29 def all! @waiting.dup.each { next! } end
closed!()
click to toggle source
# File lib/channel.rb, line 66 def closed! fail Closed end
lock!(&block)
click to toggle source
# File lib/channel.rb, line 11 def lock!(&block) @mutex.synchronize(&block) end
next!()
click to toggle source
# File lib/channel.rb, line 20 def next! loop do thr = @waiting.shift break if thr.nil? next unless thr.alive? break thr.wakeup end end
wait!()
click to toggle source
# File lib/channel.rb, line 15 def wait! @waiting << Thread.current @mutex.sleep end