class Pwrake::NBIO::Selector
Attributes
reader[R]
writer[R]
Public Class Methods
new(io_class=IO)
click to toggle source
# File lib/pwrake/nbio.rb, line 13 def initialize(io_class=IO) @reader = {} @writer = {} @running = false @io_class = io_class end
Public Instance Methods
add_reader(hdl)
click to toggle source
# File lib/pwrake/nbio.rb, line 22 def add_reader(hdl) @reader[hdl.io] = hdl end
add_writer(hdl)
click to toggle source
# File lib/pwrake/nbio.rb, line 30 def add_writer(hdl) @writer[hdl.io] = hdl end
clear()
click to toggle source
# File lib/pwrake/nbio.rb, line 42 def clear @reader.clear @writer.clear end
delete_reader(hdl)
click to toggle source
# File lib/pwrake/nbio.rb, line 26 def delete_reader(hdl) @reader.delete(hdl.io) end
delete_writer(hdl)
click to toggle source
# File lib/pwrake/nbio.rb, line 34 def delete_writer(hdl) @writer.delete(hdl.io) end
empty?()
click to toggle source
# File lib/pwrake/nbio.rb, line 38 def empty? @reader.empty? && @writer.empty? end
get_host(io)
click to toggle source
used to print an error message
# File lib/pwrake/nbio.rb, line 54 def get_host(io) hdl = @reader[io] || @writer[io] hdl.respond_to?(:host) ? hdl.host : nil end
halt()
click to toggle source
# File lib/pwrake/nbio.rb, line 47 def halt @running = false @writer.each_value{|w| w.halt} @reader.each_value{|r| r.halt} end
run(timeout=nil)
click to toggle source
# File lib/pwrake/nbio.rb, line 59 def run(timeout=nil) @running = true init_heartbeat if timeout while @running && !empty? if $debug && defined? Log rd_insp = @reader.map{|k,v| "%s=>%s,%s" % [k.inspect,v.class.inspect,v.waiter.inspect] }.join(",") Log.debug "Selector#run:\n "+caller[0..1].join("\n ")+ "\n @reader={#{rd_insp}}\n @writer.size=#{@writer.size}" $stderr.puts "Selector#run: "+caller[0] end run_select(timeout) end ensure @running = false @hb_time = nil end
Private Instance Methods
check_heartbeat(ios,timeout)
click to toggle source
# File lib/pwrake/nbio.rb, line 114 def check_heartbeat(ios,timeout) t = Pwrake.clock if t - @hb_check_time < 3 if ios ios.each do |io| @hb_time[io] = t end end return end @hb_check_time = t rds = @reader.dup if ios ios.each do |io| @hb_time[io] = t rds.delete(io) end end rds.each do |io,hdl| if hdl.check_timeout tdif = t - @hb_time[io] if tdif > timeout m = "Heartbeat Timeout: no response during #{tdif}s "+ "> timeout #{timeout}s from host=#{get_host(io)} " + "@hb_time[io=#{io.inspect}]=#{@hb_time[io].strftime('%FT%T.%6N')}" hdl.error(TimeoutError.new(m)) end end end end
init_heartbeat()
click to toggle source
# File lib/pwrake/nbio.rb, line 107 def init_heartbeat t = Pwrake.clock @hb_check_time = t @hb_time = {} @reader.each_key{|io| @hb_time[io] = t} end
run_select(timeout)
click to toggle source
# File lib/pwrake/nbio.rb, line 79 def run_select(timeout) to = (timeout) ? timeout*0.75 : nil r, w, = @io_class.select(@reader.keys,@writer.keys,[],to) check_heartbeat(r,timeout) if timeout r.each{|io| x = @reader[io]; x.call if x} if r w.each{|io| x = @writer[io]; x.call if x} if w rescue IOError => e em = "#{e.class.name}: #{e.message}" @reader.keys.each do |io| if io.closed? m = "#{em} io=#{io}" Log.error(m) if defined? Log $stderr.puts m hdl = @reader.delete(io) hdl.error(e) end end @writer.keys.each do |io| if io.closed? m = "#{em} io=#{io}" Log.error(m) if defined? Log $stderr.puts m hdl = @writer.delete(io) hdl.error(e) end end end