module Rbgo::Channel::Chan
Attributes
ios[RW]
register_mutex[RW]
Public Class Methods
after(seconds, &blk)
click to toggle source
# File lib/rbgo/select_chan.rb, line 33 def self.after(seconds, &blk) ch = new CoRun::Routine.new(new_thread: true, queue_tag: :none) do begin sleep seconds v = blk.nil? ? Time.now : blk.call ch << v rescue nil ensure ch.close end end ch end
new(max = 0)
click to toggle source
# File lib/rbgo/select_chan.rb, line 24 def self.new(max = 0) max = max.to_i if max <= 0 NonBufferChan.new else BufferChan.new(max) end end
perform(timeout: nil, &blk)
click to toggle source
# File lib/rbgo/select_chan.rb, line 67 def self.perform(timeout: nil, &blk) ch = new CoRun::Routine.new(new_thread: false, queue_tag: :default) do begin res = nil Timeout::timeout(timeout) do res = blk.call end ch << res rescue nil rescue Timeout::Error ensure ch.close end end ch end
tick(every_seconds, &blk)
click to toggle source
# File lib/rbgo/select_chan.rb, line 47 def self.tick(every_seconds, &blk) ch = new CoRun::Routine.new(new_thread: true, queue_tag: :none) do begin loop do break if ch.closed? sleep every_seconds v = blk.nil? ? Time.now : blk.call begin ch.enq(v, true) rescue ThreadError end end ensure ch.close end end ch end
Public Instance Methods
each() { |obj| ... }
click to toggle source
# File lib/rbgo/select_chan.rb, line 9 def each if block_given? loop do obj, ok = pop if ok yield obj else break end end else enum_for(:each) end end
Private Instance Methods
notify()
click to toggle source
# File lib/rbgo/select_chan.rb, line 101 def notify register_mutex.synchronize do ios.each do |io| io.close rescue nil end ios.clear end nil end
register(io)
click to toggle source
# File lib/rbgo/select_chan.rb, line 89 def register(io) register_mutex.synchronize do ios << io end end
unregister(io)
click to toggle source
# File lib/rbgo/select_chan.rb, line 95 def unregister(io) register_mutex.synchronize do ios.delete(io) end end