class Pwrake::Master
Attributes
current_flow[R]
option[R]
task_queue[R]
thread[R]
Public Class Methods
new()
click to toggle source
# File lib/pwrake/master/master.rb, line 14 def initialize @selector = NBIO::Selector.new @hostinfo_by_taskname = {} @hdl_set = [] @channel_by_hostid = {} @channels = [] @hostinfo_by_id = {} @current_flow = {} # init @option = Option.new Log.set_logger(@option) TaskWrapper.init_task_logger(@option) at_exit{TaskWrapper.close_task_logger} # moved from Option#init @option.put_log if @option['LOG_DIR'] && @option['GC_LOG_FILE'] GC::Profiler.enable end end
Public Instance Methods
create_fiber(channels,&blk)
click to toggle source
# File lib/pwrake/master/master.rb, line 188 def create_fiber(channels,&blk) channels.each do |chan| fb = Fiber.new(&blk) fb.resume(chan) end end
drop_host(host_info)
click to toggle source
# File lib/pwrake/master/master.rb, line 429 def drop_host(host_info) Log.debug "drop_host: #{host_info.name}" hid = host_info.id if @hostinfo_by_id[hid] s = "drop:#{hid}" @channel_by_hostid[hid].put_line(s) @task_queue.drop_host(host_info) @hostinfo_by_id.delete(hid) if @hostinfo_by_id.empty? if @finished Log.debug "drop_host: @finished and @hostinfo_by_id.empty?" else Log.error "drop_host: All workers retired." $stderr.puts "All workers retired." @failed = true end end end end
ending?()
click to toggle source
# File lib/pwrake/master/master.rb, line 393 def ending? if @no_more_run || @task_queue.empty? || @hostinfo_by_id.empty? case @hostinfo_by_taskname.size when 1..2 Log.debug " @no_more_run=#{@no_more_run.inspect}" if @no_more_run Log.debug " @task_queue.empty?=#{@task_queue.empty?}" if @task_queue.empty? Log.debug " @hostinfo_by_id.empty?=#{@hostinfo_by_id.empty?}" if @hostinfo_by_id.empty? Log.debug " @hostinfo_by_taskname.keys=#{@hostinfo_by_taskname.keys.inspect}" Log.debug " @post_pool.empty?=#{@post_pool.empty?}" if @post_pool.empty? end @hostinfo_by_taskname.empty? && @post_pool.empty? else false end end
finish()
click to toggle source
# File lib/pwrake/master/master.rb, line 449 def finish Log.debug "Master#finish begin" @branch_setup_thread.join # continues running fibers Log.debug "Master#finish @selector.run begin" begin Timeout.timeout(30){@selector.run} rescue end Log.debug "Master#finish @selector.run end" if !@exited @exited = true Log.debug "Master#finish Handler.exit begin" @selector.clear NBIO::Handler.exit(@hdl_set) begin Timeout.timeout(30) do @selector.run @thread.join if @thread end rescue end Log.debug "Master#finish Handler.exit end" end Log.debug "Master#finish end" @failed end
handle_failed_target(name)
click to toggle source
# File lib/pwrake/master/master.rb, line 409 def handle_failed_target(name) case @option['FAILED_TARGET'] # when /rename/i, NilClass dst = name+"._fail_" ::FileUtils.mv(name,dst) msg = "Rename output file '#{name}' to '#{dst}'" $stderr.puts(msg) Log.warn(msg) # when /delete/i ::FileUtils.rm(name) msg = "Delete output file '#{name}'" $stderr.puts(msg) Log.warn(msg) # when /leave/i end end
invoke(t, args)
click to toggle source
# File lib/pwrake/master/master.rb, line 195 def invoke(t, args) Log.debug "Master#invoke start: #{t.class}[#{t.name}]" @failed = false t.pw_search_tasks(args) return if @running @running = true if @option['GRAPH_PARTITION'] setup_postprocess0 @branch_setup_thread.join @task_queue.deq_noaction_task do |tw| tw.preprocess tw.status = "end" @post_pool.enq(tw) end @selector.run @post_pool.finish Log.debug "@post_pool.finish" require 'pwrake/misc/mcgp' MCGP.graph_partition(@option.host_map) end setup_postprocess1 @branch_setup_thread.join [:TERM,:INT].each do |sig| Signal.trap(sig) do signal_trap(sig) end end send_task_to_idle_core if ending? @post_pool.finish # need? else setup_fiber end end
kill_end(sig)
click to toggle source
# File lib/pwrake/master/master.rb, line 82 def kill_end(sig) # log writing failed. can't be called from trap context $stderr.puts "Exiting..." @no_more_run = true @failed = true @selector.clear NBIO::Handler.kill(@hdl_set,sig) begin Timeout.timeout(30) do @selector.run @thread.join if @thread end rescue end Kernel.exit(false) end
retire(hid)
click to toggle source
# File lib/pwrake/master/master.rb, line 171 def retire(hid) host_info = @hostinfo_by_id[hid.to_i] return if host_info.nil? host_info.retire(1) if host_info.retired? if !@exited m = "retired: host #{host_info.name}" Log.warn(m) $stderr.puts(m) drop_host(host_info) # delete from hostinfo_by_id if @hostinfo_by_id.empty? raise RuntimeError,"no worker host" end end end end
send_task_to_idle_core()
click to toggle source
# File lib/pwrake/master/master.rb, line 311 def send_task_to_idle_core #Log.debug "#{self.class}#send_task_to_idle_core start" count = 0 # @idle_cores.decrease(.. @task_queue.deq_task do |tw,host_info,ncore| count += 1 @hostinfo_by_taskname[tw.name] = host_info tw.set_used_cores(ncore) tw.preprocess if host_info host_info.busy(ncore) hid = host_info.id s = "#{hid}:#{tw.task_id}:#{tw.name}" @channel_by_hostid[hid].put_line(s) tw.exec_host = host_info.name tw.exec_host_id = hid else tw.status = "end" @post_pool.enq(tw) end end if count == 0 && !@task_queue.empty? && @hostinfo_by_taskname.empty? m="No task was invoked while unexecuted tasks remain" Log.error m Log.error "count=#{count} @hostinfo_by_taskname.empty?=#{@hostinfo_by_taskname.empty?} @hostinfo_by_taskname=#{@hostinfo_by_taskname.inspect} @task_queue.empty?=#{@task_queue.empty?} @task_queue=\n"+@task_queue.inspect_q raise RuntimeError,m end #Log.debug "#{self.class}#send_task_to_idle_core end time=#{Time.now-tm}" end
setup_branch_handler(sub_host)
click to toggle source
# File lib/pwrake/master/master.rb, line 39 def setup_branch_handler(sub_host) ior,w0 = IO.pipe r2,iow = IO.pipe if sub_host == "localhost" && /^(n|f)/i !~ ENV['T'] @thread = Thread.new(r2,w0,@option) do |r,w,o| Rake.application.run_branch_in_thread(r,w,o) end else dir = File.absolute_path(File.dirname($PROGRAM_NAME)) cmd = "ssh -x -T -q #{sub_host} '" + "cd \"#{Dir.pwd}\";"+ "PATH=#{dir}:${PATH} exec pwrake_branch'" Log.debug("BranchCommunicator cmd=#{cmd}") spawn(cmd,:pgroup=>true,:out=>w0,:in=>r2) w0.close r2.close Marshal.dump(@option,iow) iow.flush s = ior.gets if !s or s.chomp != "pwrake_branch start" raise RuntimeError,"pwrake_branch start failed: receive #{s.inspect}" end end rd = NBIO::Reader.new(@selector,ior) wt = NBIO::Writer.new(@selector,iow) return NBIO::Handler.new(rd,wt,sub_host) end
setup_branches()
click to toggle source
# File lib/pwrake/master/master.rb, line 99 def setup_branches sum_ncore = 0 @option.host_map.each do |sub_host, wk_hosts| @hdl_set << hdl = setup_branch_handler(sub_host) Fiber.new do hdl.put_line "host_list_begin" wk_hosts.each do |host_info| name = host_info.name ncore = host_info.ncore host_id = host_info.id Log.debug "connecting #{name} ncore=#{ncore} id=#{host_id}" hdl.put_line "host:#{host_id} #{name} #{ncore}" @channel_by_hostid[host_id] = hdl @hostinfo_by_id[host_id] = host_info end hdl.put_line "host_list_end" while s = hdl.get_line case s when /^ncore:done$/ break when /^ncore:(\d+):(\d+)$/ id, ncore = $1.to_i, $2.to_i Log.debug "worker_id=#{id} ncore=#{ncore}" @hostinfo_by_id[id].set_ncore(ncore) sum_ncore += ncore when /^ip:(\d+):(\S+)$/ id, ipa = $1.to_i, $2 Log.debug "worker_id=#{id} ip=#{ipa}" @hostinfo_by_id[id].set_ip(ipa) when /^exited$/ raise RuntimeError,"Unexpected branch exit" else msg = "#{hdl.host}:#{s.inspect}" raise RuntimeError,"invalid return: #{msg}" end end end.resume end @selector.run Log.info "num_cores=#{sum_ncore}" @option.total_cores = sum_ncore @hostinfo_by_id.each do |id,host| if ncore = @hostinfo_by_id[id].idle_cores Log.info "#{host.name} id=#{id} ncore=#{ncore}" else @hostinfo_by_id.delete(id) end end if @hostinfo_by_id.empty? raise RuntimeError,"no worker host" end @task_queue = TaskQueue.new(@option.queue_class,@hostinfo_by_id) @branch_setup_thread = Thread.new do create_fiber(@hdl_set) do |hdl| while s = hdl.get_line case s when /^retire:(\d+)$/ retire($1.to_i) when /^branch_setup:done$/ break else raise RuntimeError,"branch_setup failed: s=#{s.inspect}" end end end @selector.run end end
setup_fiber()
click to toggle source
# File lib/pwrake/master/master.rb, line 233 def setup_fiber @host_fail = @option["HOST_FAILURE"] create_fiber(@hdl_set) do |hdl| while s = hdl.get_line Log.debug "Master:recv #{s.inspect} from branch[#{hdl.host}]" case s when /^task(\w+):(\d*):(.*)$/o status, shell_id, task_name = $1, $2.to_i, $3 tw = Rake.application[task_name].wrapper tw.shell_id = shell_id tw.status = status host_info = @hostinfo_by_taskname[task_name] if host_info.nil? m = "unknown hostid: task_name=#{task_name} s=#{s.inspect}"+ " @hostinfo_by_taskname.keys=#{@hostinfo_by_taskname.keys.inspect}" Log.error(m) $stderr.puts(m) end task_end(tw,host_info) # @idle_cores.increase(.. # check failure if tw.status == "fail" $stderr.puts %[task "#{tw.name}" failed.] if host_info host_info.count_result(tw.status) continuous_fail = host_info.continuous_fail Log.debug "task=#{tw.name} continuous_fail=#{continuous_fail}" if continuous_fail > @host_fail && @hostinfo_by_id.size > 1 # retire this host drop_host(host_info) Log.warn("retired host:#{host_info.name} due to continuous fail") end end if tw.no_more_retry && !@failed @failed = true case @option['FAILURE_TERMINATION'] when 'kill' NBIO::Handler.kill(@hdl_set,"INT") @selector.run @no_more_run = true $stderr.puts "... Kill running tasks." when 'continue' $stderr.puts "... Continue runable tasks." else # 'wait' @no_more_run = true $stderr.puts "... Wait for running tasks." end end if tw.has_output_file? && File.exist?(tw.name) handle_failed_target(tw.name) end end # postprocess @post_pool.enq(tw) # must be after @no_more_run = true break if @finished when /^retire:(\d+)$/ retire($1.to_i) when /^exited$/o @exited = true Log.debug "receive #{s.chomp} from branch" break else Log.error "unknown result: #{s.inspect}" $stderr.puts(s) end end Log.debug "Master#setup_fiber: end of fiber" end if !ending? Log.debug "@selector.run begin" @selector.run Log.debug "@selector.run end" else Log.debug "@selector.run skipped" end @post_pool.finish end
setup_postprocess() { |pool,j| ... }
click to toggle source
# File lib/pwrake/master/master.rb, line 341 def setup_postprocess i = 0 n = @option.max_postprocess_pool @post_pool = FiberPool.new(n) do |pool| postproc = @option.postprocess(@selector) i += 1 Log.debug "New postprocess fiber ##{i}" Fiber.new do j = i while tw = pool.deq() #Log.debug "postproc##{j} deq=#{tw.name}" tw.postprocess(postproc) pool.count_down @hostinfo_by_taskname.delete(tw.name) tw.retry_or_subsequent unless @exited break if yield(pool,j) end postproc.close #Log.debug "postproc##{j} end" end end end
setup_postprocess0()
click to toggle source
# File lib/pwrake/master/master.rb, line 374 def setup_postprocess0 setup_postprocess{false} end
setup_postprocess1()
click to toggle source
# File lib/pwrake/master/master.rb, line 378 def setup_postprocess1 setup_postprocess do |pool,j| #Log.debug " pool.empty?=#{pool.empty?}" if ending? Log.debug "postproc##{j} closing" @finished = true @selector.halt true elsif !@no_more_run send_task_to_idle_core false end end end
signal_trap(sig)
click to toggle source
# File lib/pwrake/master/master.rb, line 67 def signal_trap(sig) $stderr.puts "\nSignal trapped. (sig=#{sig} pid=#{Process.pid})" if Rake.application.options.debug $stderr.print "in master thread #{Thread.current}:\n " $stderr.puts caller.join("\n ") if @thread $stderr.print "in branch thread #{@thread}:\n " if bt = @thread.backtrace $stderr.puts bt.join("\n ") end end end kill_end(sig) end
task_end(tw,host_info)
click to toggle source
# File lib/pwrake/master/master.rb, line 364 def task_end(tw,host_info) return if host_info.nil? host_info.idle(tw.n_used_cores||1) if host_info.retired? # all retired Log.warn("retired host:#{host_info.name} because all core retired") drop_host(host_info) end end