class Pwrake::CommunicatorSet
Attributes
ipaddr_to_rank[R]
rank_to_ipaddr[R]
selector[R]
Public Class Methods
new(master_rd,selector,option)
click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 9 def initialize(master_rd,selector,option) @master_rd = master_rd @selector = selector @option = option @communicators = {} @error_host = [] @initial_communicators = [] if hb = @option[:heartbeat] @heartbeat_timeout = hb + 30 end init_hosts end
Public Instance Methods
add(comm)
click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 53 def add(comm) @communicators[comm.id] = comm end
create_communicators()
click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 30 def create_communicators Fiber.new do s = @master_rd.get_line if s.chomp != "host_list_begin" raise "Branch#setup_worker: recv=#{s.chomp} expected=host_list_begin" end while s = @master_rd.get_line s.chomp! break if s == "host_list_end" if /^host:(\d+) (\S+) ([+-]?\d+)?$/ =~ s id, host, ncore = $1,$2,$3 ncore &&= ncore.to_i @communicators[id] = Communicator.new(self,id,host,ncore,@selector,@option) else raise "Branch#setup_worker: recv=#{s.chomp} expected=host:id hostname ncore" end end end.resume @selector.run(@heartbeat_timeout) @initial_communicators = @communicators.dup end
delete(comm)
click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 57 def delete(comm) @communicators.delete(comm.id) @error_host << comm.host end
drop(id)
click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 62 def drop(id) comm = @communicators[id] Log.debug "drop:id=#{id} comm=#{comm.inspect} @communicators.keys=#{@communicators.keys}" comm.dropout if comm end
drop_all()
click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 68 def drop_all Log.debug "drop_all" @communicators.keys.each do |id| @communicators[id].dropout end end
exit()
click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 103 def exit @selector.clear NBIO::Handler.exit(handler_set) @selector.run end
finish_shells()
click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 75 def finish_shells Log.debug "finish_shells" @communicators.keys.each do |id| @communicators[id].finish_shells end end
handler_set()
click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 93 def handler_set @communicators.each_value.map{|comm| comm.handler}.compact end
init_hosts()
click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 22 def init_hosts # for pwrake-mpi end
kill(sig)
click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 97 def kill(sig) @selector.clear NBIO::Handler.kill(handler_set,sig) @selector.run end
run(message)
click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 82 def run(message) @error_host = [] n1 = @communicators.size @selector.run(@heartbeat_timeout) n2 = @communicators.size if n1 != n2 Log.info "# of communicators: #{n1}->#{n2} during #{message.inspect}" Log.info "retired hosts=[#{@error_host.join(',')}]" end end