class Pwrake::Invoker

Public Class Methods

new() click to toggle source
# File lib/pwrake/worker/invoker.rb, line 32
def initialize
  setup_connection
  @dir_class = Pwrake.const_get(@option[:shared_directory])
  @dir_class.init(@option)
  @ex_list = {}
  @log = LogExecutor.instance
  @log.init(@option)
  @log.open(@dir_class)
  @out.add_logger(@log)
  send_ipaddr
  send_ncore
  # does NOT exit when writing to broken pipe
  Signal.trap("PIPE", "SIG_IGN")
end

Public Instance Methods

close_all() click to toggle source
# File lib/pwrake/worker/invoker.rb, line 184
def close_all
  @log.info "close_all"
  @heartbeat_thread.kill if @heartbeat_thread
  Dir.chdir
  @ex_list.each_value{|ex| ex.close}
  @ex_list.each_value{|ex| ex.join}
  @log.info "worker:end:#{@ex_list.keys.inspect}"
  Timeout.timeout(20){@log.close}
ensure
  @out.puts "exited"
end
command_callback() click to toggle source
# File lib/pwrake/worker/invoker.rb, line 136
def command_callback
  while line = get_line(@rd)
    case line
    when /^(\d+):exit$/o
      id = $1
      ex = @ex_list.delete(id)
      ex.close
      ex.join
    when /^(\d+):(.*)$/o
      id,cmd = $1,$2
      @ex_list[id].execute(cmd.chomp)
    else
      break if common_line(line)
    end
  end
  if @rd.eof?
    # connection lost
    raise RuntimeError,"lost connection to master"
  end
end
common_line(line) click to toggle source
# File lib/pwrake/worker/invoker.rb, line 157
def common_line(line)
  case line
  when /^exit$/o
    return true
    #
  when /^kill:(.*)$/o
    sig = $1
    sig = sig.to_i if /^\d+$/o =~ sig
    kill_all(sig)
    return true
    #
  when /^p$/o
    $stderr.puts "@ex_list = #{@ex_list.inspect}"
    return false
    #
  else
    msg = "invalid line: #{line.inspect}"
    @log.fatal msg
    raise RuntimeError,msg
  end
end
get_io() click to toggle source
# File lib/pwrake/mpi/worker.rb, line 19
def get_io
  # get IP addresses
  v = Socket.getifaddrs
  v = v.select{|a| a.addr.ip? && (a.flags & Socket::IFF_MULTICAST != 0)}
  v = v.map{|a| a.addr.ip_address}
  s = v.join('|')
  # write IP addresses
  iow = MPipe.new(0)
  iow.write([s.size].pack("V"))
  iow.write(s)
  iow.flush
  # returns IO, $stdin, $stdout
  [MPipe, MPipe.new(0), MPipe.new(0)]
end
get_line(io) click to toggle source
# File lib/pwrake/worker/invoker.rb, line 77
def get_line(io)
  line = io.gets
  if line
    line.chomp!
    line.strip!
    @log.info ">#{line}"
  end
  return line
end
kill_all(sig) click to toggle source
# File lib/pwrake/worker/invoker.rb, line 179
def kill_all(sig)
  @log.warn "killing worker, signal=#{sig}"
  @ex_list.each{|id,ex| ex.kill(sig)}
end
processor_count() click to toggle source
# File lib/pwrake/worker/invoker.rb, line 7
def processor_count
  begin
    Etc.nprocessors
  rescue
    # only for Linux
    IO.read("/proc/cpuinfo").scan(/^processor/).size
  end
end
run() click to toggle source
# File lib/pwrake/worker/invoker.rb, line 87
def run
  run_setup
  run_command
ensure
  close_all
end
run_command() click to toggle source
# File lib/pwrake/worker/invoker.rb, line 128
def run_command
  Fiber.new{command_callback}.resume
  @selector.run
rescue => exc
  @log.error(([exc.to_s]+exc.backtrace).join("\n"))
  kill_all("TERM")
end
run_setup() click to toggle source
# File lib/pwrake/worker/invoker.rb, line 94
def run_setup
  setup_option
  Fiber.new{setup_loop}.resume
  @selector.run
rescue => exc
  @log.error(([exc.to_s]+exc.backtrace).join("\n"))
end
send_ipaddr() click to toggle source
# File lib/pwrake/mpi/worker.rb, line 34
def send_ipaddr
  # do nothing
end
send_ncore() click to toggle source
# File lib/pwrake/worker/invoker.rb, line 57
def send_ncore
  if @ncore.kind_of?(Integer)
    if @ncore <= 0
      @ncore += processor_count()
    end
    if @ncore <= 0
      m = "Out of range: ncore=#{@ncore.inspect}"
      @out.puts "ncore:"+m
      raise ArgumentError,m
    end
  elsif @ncore.nil?
    @ncore = processor_count()
  else
    m = "Invalid argument: ncore=#{@ncore.inspect}"
    @out.puts "ncore:"+m
    raise ArgumentError,m
  end
  @out.puts "ncore:#{@ncore}"
end
setup_connection() click to toggle source
# File lib/pwrake/worker/invoker.rb, line 20
def setup_connection
  ioc, ior, iow = get_io()
  # read @ncore and @option
  @ncore,len = ior.read(8).unpack("V2")
  @option = Marshal.load(ior.read(len))
  # set pipe to branch-master
  @selector = NBIO::Selector.new(ioc)
  @rd = NBIO::Reader.new(@selector,ior)
  @out = Writer.instance
  @out.out = iow
end
setup_loop() click to toggle source
# File lib/pwrake/worker/invoker.rb, line 110
def setup_loop
  loop do
    line = get_line(@rd)
    case line
    when /^(\d+):open$/o
      $1.split.each do |id|
        @ex_list[id] = Executor.new(@selector,@dir_class,id,@option)
      end
    when "setup_end"
      return
    else
      if common_line(line)
        raise RuntimeError,"exit during setup_loop"
      end
    end
  end
end
setup_option() click to toggle source
# File lib/pwrake/worker/invoker.rb, line 102
def setup_option
  @log.info @option.inspect
  @out.heartbeat = @option[:heartbeat]
  (@option[:pass_env]||{}).each do |k,v|
    ENV[k] = v
  end
end