class Pwrake::TaskWrapper

Constants

LOCK

Attributes

assigned[R]
exec_host[RW]
exec_host_id[RW]
executed[RW]
file_stat[R]
group[R]
group_id[R]
location[R]
n_used_cores[R]
shell_id[RW]
status[RW]
task[R]
task_id[R]
tried_hosts[R]

Public Class Methods

clear_rank() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 71
def self.clear_rank
  LOCK.synchronize do
    Log.debug "#{self}.clear_rank"
    @@instances.each{|w| w.clear_rank}
  end
end
close_task_logger() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 67
def self.close_task_logger
  @@task_logger.close if @@task_logger
end
format_time(t) click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 51
def self.format_time(t)
  t.strftime("%F %T.%L")
end
init_task_logger(option) click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 55
def self.init_task_logger(option)
  if dir = option['LOG_DIR']
    fn = File.join(dir,option['TASK_CSV_FILE'])
    @@task_logger = CSV.open(fn,'w')
    @@task_logger.puts %w[
      task_id task_name start_time end_time elap_time preq
      preq_host preq_loc exec_host shell_id has_action executed ncore
      file_size file_mtime file_host write_loc
    ]
  end
end
new(task,task_args=nil) click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 15
def initialize(task,task_args=nil)
  @task = task
  @task_args = task_args
  @property = task.property
  @task_id = @@current_id
  @@current_id += 1
  @location = []
  @group = []
  @group_id = nil
  @suggest_location = nil
  @file_stat = nil
  @input_file_size = nil
  @input_file_mtime = nil
  @rank = nil
  @priority = nil
  @executed = false
  @assigned = []
  @exec_host = nil
  @exec_host_id = nil
  @tried_hosts = {}
  @n_retry = @property.retry || Rake.application.pwrake_options["RETRY"] || 1
  @@instances << self
end

Public Instance Methods

acceptable_for(host_info) click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 371
def acceptable_for(host_info)
  unless host_info
    return true
  end
  unless @property.accept_host(host_info)
    return false
  end
  if @property.reserve
    return true
  end
  host_info.accept_core(@property.use_cores(host_info.ncore))
end
clear_rank() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 295
def clear_rank
  @rank = nil
end
file_locality(nodes) click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 198
def file_locality(nodes)
  if nodes.empty? || !@exec_host_id
    nil  # not available
  elsif nodes.any?{|node|
      HostMap.ipmatch_for_name(node).include?(@exec_host_id)}
    "L"  # Local
  else
    "R"  # Remote
  end
end
file_mtime() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 303
def file_mtime
  @file_stat ? @file_stat.mtime : Time.at(0)
end
file_size() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 299
def file_size
  @file_stat ? @file_stat.size : 0
end
has_action?() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 221
def has_action?
  !@task.actions.empty?
end
has_input_file?() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 217
def has_input_file?
  is_file_task? && !prerequisites.empty?
end
has_output_file?() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 213
def has_output_file?
  is_file_task? && !actions.empty?
end
input_file_mtime() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 317
def input_file_mtime
  if has_input_file? && @input_file_mtime.nil?
    hash = Hash.new
    max_sz = 0
    prerequisites.each do |preq|
      t = Rake.application[preq].wrapper
      sz = t.file_size
      if sz > 0
        hash[t] = sz
        if sz > max_sz
          max_sz = sz
        end
      end
    end
    half_max_sz = max_sz / 2
    hash.each do |t,sz|
      if sz > half_max_sz
        time = t.file_mtime
        if @input_file_mtime.nil? || @input_file_mtime < time
          @input_file_mtime = time
        end
      end
    end
  end
  @input_file_mtime
end
input_file_size() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 307
def input_file_size
  unless @input_file_size
    @input_file_size = 0
    prerequisites.each do |preq|
      @input_file_size += Rake.application[preq].wrapper.file_size
    end
  end
  @input_file_size
end
is_file_task?() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 209
def is_file_task?
  @task.kind_of?(Rake::FileTask)
end
location=(a) click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 225
def location=(a)
  @location = a
  @group = []
  #@location.each do |host|
  #  @group |= [Rake.application.host_list.host2group[host]]
  #end
end
log_task() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 120
def log_task
  @time_end = Time.now
  @clock_end = Pwrake.clock
  #
  sug_host = suggest_location()
  shell = Pwrake::Shell.current
  #
  if sug_host && !sug_host.empty? && shell && !actions.empty?
    Rake.application.count( sug_host, shell.host )
  end
  return if !@@task_logger
  #
  if @clock_start
    elap = @clock_end - @clock_start
  else
    Loag.debug "@clock_start is not defined for #{@task.class}[#{name}]"
    elap = 0
  end
  if has_output_file?
    RANK_STAT.add_sample(rank,elap)
  end
  #
  # locality check
  loc_na = true
  preq_loc = prerequisites.map do |preq|
    locs = Rake.application[preq].wrapper.location
    if loc = file_locality(locs)
      loc_na = false
      loc
    else
      "n"
    end
  end.join("")
  preq_loc = nil if loc_na
  write_loc = file_locality(@location)
  #
  if @file_stat
    fstat = [ @file_stat.size, @file_stat.mtime,
              self.location.join('|'), write_loc ]
  else
    fstat = [nil]*4
  end
  #
  # task_id task_name start_time end_time elap_time preq preq_host
  # preq_loc exec_host shell_id has_action executed ncore
  # file_size file_mtime file_host write_loc
  #
  row = [ @task_id, name, @time_start, @time_end, "%.6f"%elap,
          prerequisites, sug_host, preq_loc, @exec_host, @shell_id,
          (actions.empty?) ? 0 : 1,
          (@executed) ? 1 : 0,
          @n_used_cores,
        ] + fstat
  row.map!{|x|
    if x.kind_of?(Time)
      TaskWrapper.format_time(x)
    elsif x.kind_of?(Array)
      if x.empty?
        nil
      else
        x.join('|')
      end
    else
      x
    end
  }
  @@task_logger << row
  #
  clsname = @task.class.to_s.sub(/^(Rake|Pwrake)::/o,"")
  msg = '%s[%s] status=%s id=%d elap=%.6f exec_host=%s' %
    [clsname,name,@status,@task_id,elap,@exec_host]
  if @status=="end"
    Log.info msg
  else
    Log.error msg
  end
end
no_more_retry() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 87
def no_more_retry
  @n_retry == 0
end
postprocess(postproc) click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 91
def postprocess(postproc)
  @executed = true if !@task.actions.empty?
  if is_file_task?
    if File.exist?(name)
      @file_stat = File::Stat.new(name)
      @location = postproc.run(self)
    end
  end
  log_task
end
preprocess() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 78
def preprocess
  @time_start = Time.now
  @clock_start = Pwrake.clock
end
priority() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 344
def priority
  if has_input_file? && @priority.nil?
    sum_tm = 0
    sum_sz = 0
    prerequisites.each do |preq|
      pq = Rake.application[preq].wrapper
      sz = pq.file_size
      if sz > 0
        tm = pq.file_mtime - START_TIME
        sum_tm += tm * sz
        sum_sz += sz
      end
    end
    if sum_sz > 0
      @priority = sum_tm / sum_sz
    else
      @priority = 0
    end
    Log.debug "task=#{name} priority=#{@priority} sum_file_size=#{sum_sz}"
  end
  @priority || 0
end
rank() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 266
def rank
  LOCK.synchronize do
  if @rank.nil?
    if subsequents.nil? || subsequents.empty?
      @rank = 0
    else
      max_rank = 0
      subsequents.each do |subsq|
        r = subsq.wrapper.rank
        if r
          if max_rank < r
            max_rank = r
          end
        else
          Log.warn "subsq.wrapper.rank=#{rank.inspect}"
        end
      end
      if has_output_file?
        step = 1
      else
        step = 0
      end
      @rank = max_rank + step
    end
  end
  @rank
  end
end
retry?() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 83
def retry?
  @status != "end" && @n_retry > 0
end
retry_or_subsequent() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 102
def retry_or_subsequent
  @tried_hosts[@exec_host] = true
  if @status=="end"
    @task.pw_enq_subsequents
  elsif @n_retry > 0
    @suggest_location = []
    s="retry task (retry_count=#{@n_retry}): #{name}"
    Log.warn(s)
    $stderr.puts(s)
    @n_retry -= 1
    Rake.application.task_queue.enq(self)
  else
    s="give up retry (retry_count=#{@n_retry}): #{name}"
    Log.error(s)
    $stderr.puts(s)
  end
end
set_used_cores(ncore) click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 367
def set_used_cores(ncore)
  @n_used_cores = ncore
end
suggest_location() click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 237
def suggest_location
  if has_input_file? && @suggest_location.nil?
    cl = Pwrake.clock
    @suggest_location = []
    loc_fsz = Hash.new(0)
    prerequisites.each do |preq|
      t = Rake.application[preq].wrapper
      loc = t.location
      fsz = t.file_size
      if loc && fsz > 0
        loc.each do |h|
          loc_fsz[h] += fsz
        end
      end
    end
    #Log.debug "input=#{prerequisites.join('|')}"
    if !loc_fsz.empty?
      half_max_fsz = loc_fsz.values.max / 2
      loc_fsz.each do |h,sz|
        if sz > half_max_fsz
          @suggest_location << h
        end
      end
      Log.debug "locate:%.6f #{name} loc_fsz=#{loc_fsz.inspect} half_max_fsz=#{half_max_fsz} suggest=#{@suggest_location.inspect}"%(Pwrake.clock-cl)
    end
  end
  @suggest_location
end
suggest_location=(a) click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 233
def suggest_location=(a)
  @suggest_location = a
end
tried_host?(host_info) click to toggle source
# File lib/pwrake/task/task_wrapper.rb, line 384
def tried_host?(host_info)
  host_info && @tried_hosts[host_info.name]
end