class Pwrake::Branch

Public Class Methods

io_class=(io_class) click to toggle source
# File lib/pwrake/branch/branch.rb, line 14
def self.io_class=(io_class)
  @@io_class = io_class
end
new(opts,r,w) click to toggle source
# File lib/pwrake/branch/branch.rb, line 18
def initialize(opts,r,w)
  Thread.abort_on_exception = true
  @option = opts
  @task_q = {}  # worker_id => FiberQueue.new
  @shells = []
  @ior = r
  @iow = w
  @selector = NBIO::Selector.new(@@io_class)
  @master_rd = NBIO::Reader.new(@selector,@ior)
  @master_wt = NBIO::Writer.new(@selector,@iow)
  @shell_start_interval = @option['SHELL_START_INTERVAL']

  # init_logger
  Log.set_logger(@option)
  if dir = @option['LOG_DIR']
    fn = File.join(dir,@option["COMMAND_CSV_FILE"])
    Shell.profiler.open(fn,@option['GNU_TIME'],@option['PLOT_PARALLELISM'])
  end
end

Public Instance Methods

finish() click to toggle source
# File lib/pwrake/branch/branch.rb, line 225
def finish
  return if @finished
  @finished = true
  #Log.debug "Branch#finish: begin"
  @cs.exit
  Log.debug "Branch#finish: worker exited"
  @master_wt.put_line "exited"
  Log.debug "Branch#finish: sent 'exited' to master"
end
invoke(t,args) click to toggle source
# File lib/pwrake/branch/branch.rb, line 208
def invoke(t,args)
  Log.debug "Branch#invoke start: #{t.class}[#{t.name}]"
  r,w = IO.pipe
  rd = NBIO::Reader.new(@selector,r)
  @search_que.enq([t,args,w])
  task_name = rd.get_line.chomp
  if t.name != task_name
    raise "t.name=#{t.name} != task_name=#{task_name}"
  end
  Log.debug "Branch#invoke end: #{t.class}[#{t.name}]"
end
kill(sig="INT") click to toggle source
# File lib/pwrake/branch/branch.rb, line 220
def kill(sig="INT")
  Log.warn "Branch#kill #{sig}"
  @cs.kill(sig)
end
read_worker_progs(worker_progs) click to toggle source
# File lib/pwrake/branch/branch.rb, line 74
def read_worker_progs(worker_progs)
  code = ""
  modpath = {}
  worker_progs.each do |f|
    m = f.split(/\//).first
    if !modpath[m]
      $LOAD_PATH.each do |x|
        if File.directory?(File.join(x,m))
          modpath[m] = x
          break
        end
      end
      if !modpath[m]
        raise RuntimeError,"load path for module #{m} not found"
      end
    end
    path = File.join(modpath[m],f)
    path += '.rb' if /\.rb$/ !~ path
    if !File.exist?(path)
      raise RuntimeError,"program file #{path} not found"
    end
    code << IO.read(path) + "\n"
  end
  code
end
run() click to toggle source

Rakefile is loaded after 'init' before 'run'

# File lib/pwrake/branch/branch.rb, line 40
def run
  setup_worker
  setup_shell
  setup_fiber
  setup_master_channel
  setup_search_thread
  @cs.run("task execution")
  Log.debug "Branch#run end"
end
setup_fiber() click to toggle source
# File lib/pwrake/branch/branch.rb, line 123
def setup_fiber
  # start fibers
  @shells.each do |shell|
    shell.create_fiber(@master_wt).resume
  end
  Log.debug "all fiber started"

  @cs.each_value do |comm|
    #comm.start_default_fiber
    Fiber.new do
      while s = comm.reader.get_line
        break unless comm.common_line(s)
      end
      Log.debug "Branch#setup_fiber: end of fiber for default channel"
    end.resume
  end

  # setup end
  @cs.each_value do |comm|
    comm.writer.put_line "setup_end"
  end

  @master_wt.put_line "branch_setup:done"
  Log.debug "Branch#setup_fiber: setup end"
end
setup_master_channel() click to toggle source
# File lib/pwrake/branch/branch.rb, line 149
def setup_master_channel
  Fiber.new do
    while s = @master_rd.get_line
      # receive command from main pwrake
      Log.debug "Branch:recv #{s.inspect} from master"
      case s
        #
      when /^(\d+):(.+)$/o
        id, tname = $1,$2
        begin
          task_name = tname.sub(/^\d+:/,"")
          @task_q[id].enq(tname)
        rescue => e
          Log.error Log.bt(e)
          ret="taskfail:#{id}:#{task_name}"
          Log.debug "fail to enq task_q[#{id}], ret=#{ret}"
          @master_wt.put_line(ret)
        end
        #
      when /^exit$/
        #@task_q.each_value{|q| q.finish}
        #@cs.drop_all
        @cs.finish_shells

        #@shells.each{|shell| shell.exit} # just for comfirm
        #@selector.halt # should halt after exited
        break
        #
      when /^drop:(.*)$/o
        id = $1
        taskq = @task_q.delete(id)
        Log.debug "drop @task_q[#{id}]=#{taskq.inspect}"
        @cs.drop(id)
        break if @cs.empty?
        #
      when /^kill:(.*)$/o
        sig = $1
        kill(sig)
        break
        #
      else
        Log.debug "Branch: invalid line from master: #{s}"
      end
    end
    Log.debug "Branch#setup_master_channel: end of fiber for master channel"
  end.resume
end
setup_search_thread() click to toggle source
# File lib/pwrake/branch/branch.rb, line 197
def setup_search_thread
  @search_que = Queue.new
  Thread.new do
    while a = @search_que.deq
      t,args,w = *a
      t.pw_search_tasks(args)
      w.puts(t.name)
    end
  end
end
setup_shell() click to toggle source
# File lib/pwrake/branch/branch.rb, line 100
def setup_shell
  @shells = []
  @cs.each_value do |comm|
    @task_q[comm.id] = task_q = FiberQueue.new
    comm.ncore.times do
      chan = comm.new_channel
      shell = Shell.new(chan,comm,task_q,@option.worker_option)
      # wait for remote shell open
      Fiber.new do
        if shell.open
          @shells << shell
        else
          @master_wt.put_line "retire:#{comm.id}"
        end
        Log.debug "Branch#setup_shells: end of fiber to open shell"
      end.resume
      sleep @shell_start_interval
    end
  end

  @cs.run("setup shells")
end
setup_worker() click to toggle source
# File lib/pwrake/branch/branch.rb, line 50
def setup_worker
  @cs = CommunicatorSet.new(@master_rd,@selector,@option.worker_option)
  @cs.create_communicators
  worker_code = read_worker_progs(@option.worker_progs)
  @cs.each_value do |comm|
    Fiber.new do
      comm.connect(worker_code)
    end.resume
  end
  @cs.run("connect to workers")
  #
  Fiber.new do
    @cs.each_value do |comm|
      # set WorkerChannel#ncore at Master
      @master_wt.put_line "ncore:#{comm.id}:#{comm.ncore}"
      comm.ipaddr.each do |ipa|
        @master_wt.put_line "ip:#{comm.id}:#{ipa}"
      end
    end
    @master_wt.put_line "ncore:done"
  end.resume
  @selector.run
end