class Pwrake::NBIO::MultiReader
Attributes
default_queue[RW]
queue[R]
Public Class Methods
new(selector, io, n_chan=0)
click to toggle source
Calls superclass method
Pwrake::NBIO::Reader::new
# File lib/pwrake/nbio.rb, line 365 def initialize(selector, io, n_chan=0) super(selector, io) @n_chan = n_chan @queue = @n_chan.times.map{|i| FiberReaderQueue.new(self)} @default_queue = FiberReaderQueue.new(self) @check_timeout = true end
Public Instance Methods
[](ch)
click to toggle source
# File lib/pwrake/nbio.rb, line 375 def [](ch) @queue[ch] end
call()
click to toggle source
# File lib/pwrake/nbio.rb, line 395 def call while line = read_line_nonblock if /^(\d+):(.*)$/ =~ line ch,str = $1,$2 if q = @queue[ch.to_i] q.enq(str) else raise "No queue ##{ch}, received: #{line}" end elsif @default_queue @default_queue.enq(line) else raise "No default_queue, received: #{line}" end end rescue EOFError halt rescue IO::WaitReadable end
error(e)
click to toggle source
# File lib/pwrake/nbio.rb, line 415 def error(e) @closed = true @queue.each{|q| q.enq(e)} @default_queue.enq(e) end
get_line(ch=nil)
click to toggle source
call from Fiber context
# File lib/pwrake/nbio.rb, line 387 def get_line(ch=nil) if ch && !@queue.empty? @queue[ch].deq else @default_queue.deq end end
halt()
click to toggle source
# File lib/pwrake/nbio.rb, line 421 def halt Log.debug("Handler.halt") if defined? Log @queue.each{|q| q.halt} @default_queue.halt end
new_queue()
click to toggle source
# File lib/pwrake/nbio.rb, line 379 def new_queue n = @n_chan @queue << q = FiberReaderQueue.new(self) @n_chan += 1 [n,q] end