class Pwrake::CommunicatorSet

Attributes

ipaddr_to_rank[R]
rank_to_ipaddr[R]
selector[R]

Public Class Methods

new(master_rd,selector,option) click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 9
def initialize(master_rd,selector,option)
  @master_rd = master_rd
  @selector = selector
  @option = option
  @communicators = {}
  @error_host = []
  @initial_communicators = []
  if hb = @option[:heartbeat]
    @heartbeat_timeout = hb + 30
  end
  init_hosts
end

Public Instance Methods

add(comm) click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 53
def add(comm)
  @communicators[comm.id] = comm
end
create_communicators() click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 30
def create_communicators
  Fiber.new do
    s = @master_rd.get_line
    if s.chomp != "host_list_begin"
      raise "Branch#setup_worker: recv=#{s.chomp} expected=host_list_begin"
    end

    while s = @master_rd.get_line
      s.chomp!
      break if s == "host_list_end"
      if /^host:(\d+) (\S+) ([+-]?\d+)?$/ =~ s
        id, host, ncore = $1,$2,$3
        ncore &&= ncore.to_i
        @communicators[id] = Communicator.new(self,id,host,ncore,@selector,@option)
      else
        raise "Branch#setup_worker: recv=#{s.chomp} expected=host:id hostname ncore"
      end
    end
  end.resume
  @selector.run(@heartbeat_timeout)
  @initial_communicators = @communicators.dup
end
delete(comm) click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 57
def delete(comm)
  @communicators.delete(comm.id)
  @error_host << comm.host
end
drop(id) click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 62
def drop(id)
  comm = @communicators[id]
  Log.debug "drop:id=#{id} comm=#{comm.inspect} @communicators.keys=#{@communicators.keys}"
  comm.dropout if comm
end
drop_all() click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 68
def drop_all
  Log.debug "drop_all"
  @communicators.keys.each do |id|
    @communicators[id].dropout
  end
end
exit() click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 103
def exit
  @selector.clear
  NBIO::Handler.exit(handler_set)
  @selector.run
end
finish_shells() click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 75
def finish_shells
  Log.debug "finish_shells"
  @communicators.keys.each do |id|
    @communicators[id].finish_shells
  end
end
handler_set() click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 93
def handler_set
  @communicators.each_value.map{|comm| comm.handler}.compact
end
init_hosts() click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 22
def init_hosts
  # for pwrake-mpi
end
kill(sig) click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 97
def kill(sig)
  @selector.clear
  NBIO::Handler.kill(handler_set,sig)
  @selector.run
end
run(message) click to toggle source
# File lib/pwrake/branch/communicator_set.rb, line 82
def run(message)
  @error_host = []
  n1 = @communicators.size
  @selector.run(@heartbeat_timeout)
  n2 = @communicators.size
  if n1 != n2
    Log.info "# of communicators: #{n1}->#{n2} during #{message.inspect}"
    Log.info "retired hosts=[#{@error_host.join(',')}]"
  end
end