class Pwrake::NBIO::MultiReader

Attributes

default_queue[RW]
queue[R]

Public Class Methods

new(selector, io, n_chan=0) click to toggle source
Calls superclass method Pwrake::NBIO::Reader::new
# File lib/pwrake/nbio.rb, line 365
def initialize(selector, io, n_chan=0)
  super(selector, io)
  @n_chan = n_chan
  @queue = @n_chan.times.map{|i| FiberReaderQueue.new(self)}
  @default_queue = FiberReaderQueue.new(self)
  @check_timeout = true
end

Public Instance Methods

[](ch) click to toggle source
# File lib/pwrake/nbio.rb, line 375
def [](ch)
  @queue[ch]
end
call() click to toggle source
# File lib/pwrake/nbio.rb, line 395
def call
  while line = read_line_nonblock
    if /^(\d+):(.*)$/ =~ line
      ch,str = $1,$2
      if q = @queue[ch.to_i]
        q.enq(str)
      else
        raise "No queue ##{ch}, received: #{line}"
      end
    elsif @default_queue
      @default_queue.enq(line)
    else
      raise "No default_queue, received: #{line}"
    end
  end
rescue EOFError
  halt
rescue IO::WaitReadable
end
error(e) click to toggle source
# File lib/pwrake/nbio.rb, line 415
def error(e)
  @closed = true
  @queue.each{|q| q.enq(e)}
  @default_queue.enq(e)
end
get_line(ch=nil) click to toggle source

call from Fiber context

# File lib/pwrake/nbio.rb, line 387
def get_line(ch=nil)
  if ch && !@queue.empty?
    @queue[ch].deq
  else
    @default_queue.deq
  end
end
halt() click to toggle source
# File lib/pwrake/nbio.rb, line 421
def halt
  Log.debug("Handler.halt") if defined? Log
  @queue.each{|q| q.halt}
  @default_queue.halt
end
new_queue() click to toggle source
# File lib/pwrake/nbio.rb, line 379
def new_queue
  n = @n_chan
  @queue << q = FiberReaderQueue.new(self)
  @n_chan += 1
  [n,q]
end