class Pwrake::TaskQueue
Public Class Methods
_qstr(h,q)
click to toggle source
# File lib/pwrake/queue/task_queue.rb, line 130 def self._qstr(h,q) s = " #{h}: size=#{q.size} " case q.size when 0 s << "[]\n" when 1 s << "[#{q.first.name}]\n" when 2 s << "[#{q.first.name}, #{q.last.name}]\n" else s << "[#{q.first.name}, .., #{q.last.name}]\n" end s end
new(queue_class, hostinfo_by_id, group_map=nil)
click to toggle source
# File lib/pwrake/queue/task_queue.rb, line 9 def initialize(queue_class, hostinfo_by_id, group_map=nil) @queue_class = Pwrake.const_get(queue_class) @hostinfo_by_id = hostinfo_by_id @lock = Monitor.new @q_no_action = NoActionQueue.new @q_reserved = Hash.new @nenq = 0 @ndeq = 0 def @q_reserved.first super.last end def @q_reserved.last self[keys.last] end pri = Rake.application.pwrake_options['QUEUE_PRIORITY'] || "LIFO" case pri when /^fifo$/i @array_class = FifoQueueArray when /^lifo$/i @array_class = LifoQueueArray when /^lihr$/i @array_class = LifoHrfQueueArray else raise RuntimeError,"unknown option for QUEUE_PRIORITY: "+pri end Log.debug "@array_class=#{@array_class.inspect}" # median number of cores a = @hostinfo_by_id.map{|id,host_info| host_info.ncore}.sort n = a.size @median_core = (n%2==0) ? (a[n/2-1]+a[n/2])/2 : a[(n-1)/2] @q = @queue_class.new(hostinfo_by_id,@array_class,@median_core,group_map) end
Public Instance Methods
clear()
click to toggle source
# File lib/pwrake/queue/task_queue.rb, line 118 def clear @q_no_action.clear @q_reserved.clear @q.clear end
deq_noaction_task() { |tw| ... }
click to toggle source
# File lib/pwrake/queue/task_queue.rb, line 75 def deq_noaction_task(&block) while tw = @q_no_action.shift yield(tw) @ndeq += 1 end end
deq_reserve() { |tw,host_info,n_core| ... }
click to toggle source
# File lib/pwrake/queue/task_queue.rb, line 82 def deq_reserve(&block) @q_reserved.each do |host_info,tw| n_idle = host_info.idle_cores || 0 n_core = tw.use_cores(host_info) if n_idle >= n_core @q_reserved.delete(host_info) Log.debug "deq_reserve: #{tw.name} n_use_cores=#{n_core}" yield(tw,host_info,n_core) @ndeq += 1 end end end
deq_task(&block)
click to toggle source
# File lib/pwrake/queue/task_queue.rb, line 56 def deq_task(&block) @lock.synchronize do if @nenq > 0 Log.debug "deq_task nenq=#{@nenq}:"+(empty? ? " (empty)" : "\n"+inspect_q) @nenq = 0 end deq_noaction_task(&block) deq_reserve(&block) @q.deq_start unless @q.empty? @q.turns.each{|turn| deq_turn(turn,&block) } end if @ndeq > 0 Log.debug "deq_task ndeq=#{@ndeq}:"+(empty? ? " (empty)" : "\n"+inspect_q) @ndeq = 0 end end end
deq_turn(turn) { |tw,host_info,n_core| ... }
click to toggle source
# File lib/pwrake/queue/task_queue.rb, line 95 def deq_turn(turn,&block) begin count = 0 @hostinfo_by_id.each_value do |host_info| return if @q.turn_empty?(turn) n_idle = host_info.idle_cores || 0 next if n_idle == 0 || @q_reserved[host_info] if tw = @q.deq_impl(host_info,turn) n_core = tw.use_cores(host_info) if n_idle >= n_core Log.debug "deq: #{tw.name} n_use_cores=#{n_core}" yield(tw,host_info,n_core) count += 1 @ndeq += 1 else @q_reserved[host_info] = tw Log.debug "reserve host: #{host_info.name} for #{tw.name} (#{n_core} cores)" end end end end while count > 0 end
drop_host(host_info)
click to toggle source
# File lib/pwrake/queue/task_queue.rb, line 151 def drop_host(host_info) @q.drop_host(host_info) end
empty?()
click to toggle source
# File lib/pwrake/queue/task_queue.rb, line 124 def empty? @q_no_action.empty? && @q_reserved.empty? && @q.empty? end
enq(tw)
click to toggle source
# File lib/pwrake/queue/task_queue.rb, line 45 def enq(tw) @lock.synchronize do if tw.nil? || tw.actions.empty? @q_no_action.push(tw) else @q.enq_impl(tw) end @nenq += 1 end end
inspect_q()
click to toggle source
# File lib/pwrake/queue/task_queue.rb, line 145 def inspect_q TaskQueue._qstr("noaction",@q_no_action) + @q.inspect_q + TaskQueue._qstr("reserved",@q_reserved) end