class Pwrake::NBIO::Selector

Attributes

reader[R]
writer[R]

Public Class Methods

new(io_class=IO) click to toggle source
# File lib/pwrake/nbio.rb, line 13
def initialize(io_class=IO)
  @reader = {}
  @writer = {}
  @running = false
  @io_class = io_class
end

Public Instance Methods

add_reader(hdl) click to toggle source
# File lib/pwrake/nbio.rb, line 22
def add_reader(hdl)
  @reader[hdl.io] = hdl
end
add_writer(hdl) click to toggle source
# File lib/pwrake/nbio.rb, line 30
def add_writer(hdl)
  @writer[hdl.io] = hdl
end
clear() click to toggle source
# File lib/pwrake/nbio.rb, line 42
def clear
  @reader.clear
  @writer.clear
end
delete_reader(hdl) click to toggle source
# File lib/pwrake/nbio.rb, line 26
def delete_reader(hdl)
  @reader.delete(hdl.io)
end
delete_writer(hdl) click to toggle source
# File lib/pwrake/nbio.rb, line 34
def delete_writer(hdl)
  @writer.delete(hdl.io)
end
empty?() click to toggle source
# File lib/pwrake/nbio.rb, line 38
def empty?
  @reader.empty? && @writer.empty?
end
get_host(io) click to toggle source

used to print an error message

# File lib/pwrake/nbio.rb, line 54
def get_host(io)
  hdl = @reader[io] || @writer[io]
  hdl.respond_to?(:host) ? hdl.host : nil
end
halt() click to toggle source
# File lib/pwrake/nbio.rb, line 47
def halt
  @running = false
  @writer.each_value{|w| w.halt}
  @reader.each_value{|r| r.halt}
end
run(timeout=nil) click to toggle source
# File lib/pwrake/nbio.rb, line 59
def run(timeout=nil)
  @running = true
  init_heartbeat if timeout
  while @running && !empty?
    if $debug && defined? Log
      rd_insp = @reader.map{|k,v|
        "%s=>%s,%s" % [k.inspect,v.class.inspect,v.waiter.inspect]
      }.join(",")
      Log.debug "Selector#run:\n "+caller[0..1].join("\n ")+
        "\n @reader={#{rd_insp}}\n @writer.size=#{@writer.size}"
      $stderr.puts "Selector#run: "+caller[0]
    end
    run_select(timeout)
  end
ensure
  @running = false
  @hb_time = nil
end

Private Instance Methods

check_heartbeat(ios,timeout) click to toggle source
# File lib/pwrake/nbio.rb, line 114
def check_heartbeat(ios,timeout)
  t = Pwrake.clock
  if t - @hb_check_time < 3
    if ios
      ios.each do |io|
        @hb_time[io] = t
      end
    end
    return
  end
  @hb_check_time = t
  rds = @reader.dup
  if ios
    ios.each do |io|
      @hb_time[io] = t
      rds.delete(io)
    end
  end
  rds.each do |io,hdl|
    if hdl.check_timeout
      tdif = t - @hb_time[io]
      if tdif > timeout
        m = "Heartbeat Timeout: no response during #{tdif}s "+
          "> timeout #{timeout}s from host=#{get_host(io)} " +
          "@hb_time[io=#{io.inspect}]=#{@hb_time[io].strftime('%FT%T.%6N')}"
        hdl.error(TimeoutError.new(m))
      end
    end
  end
end
init_heartbeat() click to toggle source
# File lib/pwrake/nbio.rb, line 107
def init_heartbeat
  t = Pwrake.clock
  @hb_check_time = t
  @hb_time = {}
  @reader.each_key{|io| @hb_time[io] = t}
end
run_select(timeout) click to toggle source
# File lib/pwrake/nbio.rb, line 79
def run_select(timeout)
  to = (timeout) ? timeout*0.75 : nil
  r, w, = @io_class.select(@reader.keys,@writer.keys,[],to)
  check_heartbeat(r,timeout) if timeout
  r.each{|io| x = @reader[io]; x.call if x} if r
  w.each{|io| x = @writer[io]; x.call if x} if w
rescue IOError => e
  em = "#{e.class.name}: #{e.message}"
  @reader.keys.each do |io|
    if io.closed?
      m = "#{em} io=#{io}"
      Log.error(m) if defined? Log
      $stderr.puts m
      hdl = @reader.delete(io)
      hdl.error(e)
    end
  end
  @writer.keys.each do |io|
    if io.closed?
      m = "#{em} io=#{io}"
      Log.error(m) if defined? Log
      $stderr.puts m
      hdl = @writer.delete(io)
      hdl.error(e)
    end
  end
end