class BaseChip::Tasker

Constants

CLEAR

Attributes

client_id[RW]
client_of[RW]
clusters[RW]
complete_tasks[RW]
foreground[RW]
pending_tasks[RW]
running_tasks[RW]
task_master[RW]
workers[RW]

Public Class Methods

new(*args) click to toggle source
# File lib/base_chip/tasker.rb, line 41
def initialize(*args)
  @workers = {}

  @pending_tasks  = []; @tasks_pending  = 0
  @running_tasks  = []; @tasks_running  = 0
  @complete_tasks = []; @tasks_complete = 0
  @tasks_passed = 0
  @tasks_failed = 0

  @mutex = Mutex.new

  @server = self
end

Public Instance Methods

available_slots() click to toggle source
# File lib/base_chip/tasker.rb, line 122
def available_slots
  count = 0
  @clusters.each do |c|
    count += c.available_slots
  end
  count
end
clear_line() click to toggle source
# File lib/base_chip/tasker.rb, line 256
def clear_line
  print "#{Tasker::CLEAR}\r"
end
finish() click to toggle source
# File lib/base_chip/tasker.rb, line 222
def finish
  return if @finishing
            @finishing = true

  kill
  @task_master.tasker_finish
  DRb.stop_service
end
get_task(client_id) click to toggle source
# File lib/base_chip/tasker.rb, line 183
def get_task(client_id)
  @mutex.lock
  ready = maintain_workers

  w = @workers[client_id]
  raise "Could not find worker based on unique id #{client_id}" unless w

  t = nil
  ready.each do |rt|
    if rt.worker_command.nil? || rt.worker_command.call(0) == w[:worker_command]
      t = rt
      break
    end
  end

  if t
    @pending_tasks.delete t
    @running_tasks <<     t
    @tasks_pending -= 1
    @tasks_running += 1

    w[:tasks] << t
    w[:state] = :running
  else
    w[:state] = :stopped
    w[:cluster].slots_used -= 1 if w[:cluster]
  end

  finish if @pending_tasks.empty? && @running_tasks.empty? && ready.empty?

  if @pending_tasks.size > 0 && @running_tasks.empty? && ready.empty?
    raise("Could not build tasks because their dependencies never cleared: " + (@pending_tasks.map{|t|t.task_name}.join(" ")))
  end

  status_line
  @mutex.unlock
  t
end
inner_register_results(t, result, *additional) click to toggle source
# File lib/base_chip/tasker.rb, line 89
def inner_register_results(t, result, *additional)
  return         if @complete_tasks.select {|t2| t2.task_name == t.task_name}[0]
  original_task   = (@pending_tasks.select {|t2| t2.task_name == t.task_name}[0])
  original_task ||= (@running_tasks.select {|t2| t2.task_name == t.task_name}[0])
  raise "INTERNAL ERROR: couldn't find #{t.task_name}" unless original_task

  if result == :cancel
       @tasks_pending -= 1; @pending_tasks.delete original_task
  else @tasks_running -= 1; @running_tasks.delete original_task
  end
  @complete_tasks << t
  @tasks_complete += 1

  if result == 'pass'
       @tasks_passed += 1
       original_task.next_tasks.each { |nt| nt.wait_count -= 1                 }
  else original_task.next_tasks.each { |nt| inner_register_results(nt,:cancel) }
       @tasks_failed += 1 unless result == :cancel
  end
  @task_master.tasker_handle_results(t, result, *additional)
  status_line

  finish if (BaseChip.options.max_fails  && @tasks_failed >= BaseChip.options.max_fails ) ||
            (BaseChip.options.max_passes && @tasks_passed >= BaseChip.options.max_passes)
end
kill() click to toggle source
# File lib/base_chip/tasker.rb, line 231
def kill
  locked = @mutex.try_lock
  @workers.each do |id,w|
    next if w[:stopped]
    if w[:cluster] && (w[:state] != :stopped)
      kill_message
      w[:cluster].kill(w[:uid])
    end
  end
  (@running_tasks + @pending_tasks).each do |t|
    inner_register_results t, :cancel
  end
  @mutex.unlock if locked
end
kill_message() click to toggle source
# File lib/base_chip/tasker.rb, line 245
def kill_message
  return if @kill_message
            @kill_message = true
  puts "#{Tasker::CLEAR}\rClosing down all workers.  Please be patient."
end
maintain_workers() click to toggle source
# File lib/base_chip/tasker.rb, line 141
def maintain_workers
  ready = @pending_tasks.select {|t| t.wait_count == 0}
  if @clusters
    slots = available_slots
    foo = ready.size - workers_spawned
    foo = slots if slots < foo
    if foo > 0
      foo.times do |i|
        client_id = new_client_id
        cluster, uid = spawn_worker(ready[0].worker_command.call(client_id))
        raise "spawn_worker response wasn't unique" if @workers[uid]
        @workers[client_id] = {:uid => uid, :cluster=> cluster, :started=>Time.now, :tasks=>[], :state=>:spawned, :worker_command=>ready[i].worker_command.call(0)}
        status_line
      end
    end
  else
    @workers[0] = {:started=>Time.now, :tasks=>[], :state=>:running, :worker_command=>ready[0].worker_command} if ready[0]
  end

  # puts "ready=#{(ready.*.task_name).inspect}"
  ready
end
new_client_id() click to toggle source
# File lib/base_chip/tasker.rb, line 138
def new_client_id
  @@last_client_id += 1
end
ready() click to toggle source
# File lib/base_chip/tasker.rb, line 58
def ready
  init_test_sockets(:server)
end
register_results(t, result, *additional) click to toggle source
# File lib/base_chip/tasker.rb, line 80
def register_results(t, result, *additional)
  if @client_of
    @server.register_results(t, result, *additional)
    return
  end
  @mutex.lock
  inner_register_results(t, result, *additional)
  @mutex.unlock
end
run() click to toggle source
# File lib/base_chip/tasker.rb, line 62
def run
  STDOUT.sync = true 
  if @client_of || @clusters.nil?
    begin
      if @client_of
        init_test_sockets(:client)
      end
      while(t = @server.get_task(@client_id))
        @task_master.tasker_run_task t
      end
    rescue DRb::DRbConnError
      # raise if @options.debug
    end
  else
    DRb.thread.join
  end
end
spawn_worker(command) click to toggle source
# File lib/base_chip/tasker.rb, line 129
def spawn_worker(command)
  @clusters.each do |c|
    if c.available_slots > 0
      c.slots_used += 1
      return c, c.submit(command)
    end
  end
end
status_line() click to toggle source
# File lib/base_chip/tasker.rb, line 114
def status_line
  return if @foreground
  wr = workers_running
  wa = workers_alive
  wp = wa > 0 ? (100*wr/wa).round : 0
  raise "Workload is empty" if tasks_total == 0
  print "#{Tasker::CLEAR}complete:#{@tasks_complete}/#{tasks_total}(#{(100*@tasks_complete/tasks_total).round}%) workers:#{wr}/#{wa}(#{wp}%) pass:#{@tasks_passed} fail:#{@tasks_failed} run:#{@tasks_running} pend:#{@tasks_pending}\r"
end
submit(task) click to toggle source
# File lib/base_chip/tasker.rb, line 170
def submit(task)
  task.foreground = self.foreground
  @pending_tasks << task
  @tasks_pending += 1
  maintain_workers
  status_line
end
submit_name(t_name,worker_command=nil) click to toggle source
# File lib/base_chip/tasker.rb, line 164
def submit_name(t_name,worker_command=nil)
  task = Task.new
  task.task_name      = t_name
  task.worker_command = worker_command if @clusters
  submit(task)
end
submit_workload_chain(chain) click to toggle source
# File lib/base_chip/tasker.rb, line 177
def submit_workload_chain(chain)
  @pending_tasks = chain
  @tasks_pending = chain.size
  maintain_workers
  status_line
end
tasks_total() click to toggle source
# File lib/base_chip/tasker.rb, line 54
def tasks_total
  @tasks_pending + @tasks_running + @tasks_complete
end
workers_alive() click to toggle source
# File lib/base_chip/tasker.rb, line 251
def workers_alive;   c=0; @workers.each_value{|w| c+=1 if w[:state] != :stopped}; c end
workers_running() click to toggle source
# File lib/base_chip/tasker.rb, line 252
def workers_running; c=0; @workers.each_value{|w| c+=1 if w[:state] == :running}; c end
workers_spawned() click to toggle source
# File lib/base_chip/tasker.rb, line 253
def workers_spawned; c=0; @workers.each_value{|w| c+=1 if w[:state] == :spawned}; c end
workers_stopped() click to toggle source
# File lib/base_chip/tasker.rb, line 254
def workers_stopped; c=0; @workers.each_value{|w| c+=1 if w[:state] == :stopped}; c end