class BaseChip::Tasker
Constants
- CLEAR
Attributes
client_id[RW]
client_of[RW]
clusters[RW]
complete_tasks[RW]
foreground[RW]
pending_tasks[RW]
running_tasks[RW]
task_master[RW]
workers[RW]
Public Class Methods
new(*args)
click to toggle source
# File lib/base_chip/tasker.rb, line 41 def initialize(*args) @workers = {} @pending_tasks = []; @tasks_pending = 0 @running_tasks = []; @tasks_running = 0 @complete_tasks = []; @tasks_complete = 0 @tasks_passed = 0 @tasks_failed = 0 @mutex = Mutex.new @server = self end
Public Instance Methods
available_slots()
click to toggle source
# File lib/base_chip/tasker.rb, line 122 def available_slots count = 0 @clusters.each do |c| count += c.available_slots end count end
clear_line()
click to toggle source
# File lib/base_chip/tasker.rb, line 256 def clear_line print "#{Tasker::CLEAR}\r" end
finish()
click to toggle source
# File lib/base_chip/tasker.rb, line 222 def finish return if @finishing @finishing = true kill @task_master.tasker_finish DRb.stop_service end
get_task(client_id)
click to toggle source
# File lib/base_chip/tasker.rb, line 183 def get_task(client_id) @mutex.lock ready = maintain_workers w = @workers[client_id] raise "Could not find worker based on unique id #{client_id}" unless w t = nil ready.each do |rt| if rt.worker_command.nil? || rt.worker_command.call(0) == w[:worker_command] t = rt break end end if t @pending_tasks.delete t @running_tasks << t @tasks_pending -= 1 @tasks_running += 1 w[:tasks] << t w[:state] = :running else w[:state] = :stopped w[:cluster].slots_used -= 1 if w[:cluster] end finish if @pending_tasks.empty? && @running_tasks.empty? && ready.empty? if @pending_tasks.size > 0 && @running_tasks.empty? && ready.empty? raise("Could not build tasks because their dependencies never cleared: " + (@pending_tasks.map{|t|t.task_name}.join(" "))) end status_line @mutex.unlock t end
inner_register_results(t, result, *additional)
click to toggle source
# File lib/base_chip/tasker.rb, line 89 def inner_register_results(t, result, *additional) return if @complete_tasks.select {|t2| t2.task_name == t.task_name}[0] original_task = (@pending_tasks.select {|t2| t2.task_name == t.task_name}[0]) original_task ||= (@running_tasks.select {|t2| t2.task_name == t.task_name}[0]) raise "INTERNAL ERROR: couldn't find #{t.task_name}" unless original_task if result == :cancel @tasks_pending -= 1; @pending_tasks.delete original_task else @tasks_running -= 1; @running_tasks.delete original_task end @complete_tasks << t @tasks_complete += 1 if result == 'pass' @tasks_passed += 1 original_task.next_tasks.each { |nt| nt.wait_count -= 1 } else original_task.next_tasks.each { |nt| inner_register_results(nt,:cancel) } @tasks_failed += 1 unless result == :cancel end @task_master.tasker_handle_results(t, result, *additional) status_line finish if (BaseChip.options.max_fails && @tasks_failed >= BaseChip.options.max_fails ) || (BaseChip.options.max_passes && @tasks_passed >= BaseChip.options.max_passes) end
kill()
click to toggle source
# File lib/base_chip/tasker.rb, line 231 def kill locked = @mutex.try_lock @workers.each do |id,w| next if w[:stopped] if w[:cluster] && (w[:state] != :stopped) kill_message w[:cluster].kill(w[:uid]) end end (@running_tasks + @pending_tasks).each do |t| inner_register_results t, :cancel end @mutex.unlock if locked end
kill_message()
click to toggle source
# File lib/base_chip/tasker.rb, line 245 def kill_message return if @kill_message @kill_message = true puts "#{Tasker::CLEAR}\rClosing down all workers. Please be patient." end
maintain_workers()
click to toggle source
# File lib/base_chip/tasker.rb, line 141 def maintain_workers ready = @pending_tasks.select {|t| t.wait_count == 0} if @clusters slots = available_slots foo = ready.size - workers_spawned foo = slots if slots < foo if foo > 0 foo.times do |i| client_id = new_client_id cluster, uid = spawn_worker(ready[0].worker_command.call(client_id)) raise "spawn_worker response wasn't unique" if @workers[uid] @workers[client_id] = {:uid => uid, :cluster=> cluster, :started=>Time.now, :tasks=>[], :state=>:spawned, :worker_command=>ready[i].worker_command.call(0)} status_line end end else @workers[0] = {:started=>Time.now, :tasks=>[], :state=>:running, :worker_command=>ready[0].worker_command} if ready[0] end # puts "ready=#{(ready.*.task_name).inspect}" ready end
new_client_id()
click to toggle source
# File lib/base_chip/tasker.rb, line 138 def new_client_id @@last_client_id += 1 end
ready()
click to toggle source
# File lib/base_chip/tasker.rb, line 58 def ready init_test_sockets(:server) end
register_results(t, result, *additional)
click to toggle source
# File lib/base_chip/tasker.rb, line 80 def register_results(t, result, *additional) if @client_of @server.register_results(t, result, *additional) return end @mutex.lock inner_register_results(t, result, *additional) @mutex.unlock end
run()
click to toggle source
# File lib/base_chip/tasker.rb, line 62 def run STDOUT.sync = true if @client_of || @clusters.nil? begin if @client_of init_test_sockets(:client) end while(t = @server.get_task(@client_id)) @task_master.tasker_run_task t end rescue DRb::DRbConnError # raise if @options.debug end else DRb.thread.join end end
spawn_worker(command)
click to toggle source
# File lib/base_chip/tasker.rb, line 129 def spawn_worker(command) @clusters.each do |c| if c.available_slots > 0 c.slots_used += 1 return c, c.submit(command) end end end
status_line()
click to toggle source
# File lib/base_chip/tasker.rb, line 114 def status_line return if @foreground wr = workers_running wa = workers_alive wp = wa > 0 ? (100*wr/wa).round : 0 raise "Workload is empty" if tasks_total == 0 print "#{Tasker::CLEAR}complete:#{@tasks_complete}/#{tasks_total}(#{(100*@tasks_complete/tasks_total).round}%) workers:#{wr}/#{wa}(#{wp}%) pass:#{@tasks_passed} fail:#{@tasks_failed} run:#{@tasks_running} pend:#{@tasks_pending}\r" end
submit(task)
click to toggle source
# File lib/base_chip/tasker.rb, line 170 def submit(task) task.foreground = self.foreground @pending_tasks << task @tasks_pending += 1 maintain_workers status_line end
submit_name(t_name,worker_command=nil)
click to toggle source
# File lib/base_chip/tasker.rb, line 164 def submit_name(t_name,worker_command=nil) task = Task.new task.task_name = t_name task.worker_command = worker_command if @clusters submit(task) end
submit_workload_chain(chain)
click to toggle source
# File lib/base_chip/tasker.rb, line 177 def submit_workload_chain(chain) @pending_tasks = chain @tasks_pending = chain.size maintain_workers status_line end
tasks_total()
click to toggle source
# File lib/base_chip/tasker.rb, line 54 def tasks_total @tasks_pending + @tasks_running + @tasks_complete end
workers_alive()
click to toggle source
# File lib/base_chip/tasker.rb, line 251 def workers_alive; c=0; @workers.each_value{|w| c+=1 if w[:state] != :stopped}; c end
workers_running()
click to toggle source
# File lib/base_chip/tasker.rb, line 252 def workers_running; c=0; @workers.each_value{|w| c+=1 if w[:state] == :running}; c end
workers_spawned()
click to toggle source
# File lib/base_chip/tasker.rb, line 253 def workers_spawned; c=0; @workers.each_value{|w| c+=1 if w[:state] == :spawned}; c end
workers_stopped()
click to toggle source
# File lib/base_chip/tasker.rb, line 254 def workers_stopped; c=0; @workers.each_value{|w| c+=1 if w[:state] == :stopped}; c end