class Pwrake::LocalityAwareQueue
Attributes
turns[R]
Public Class Methods
new(hostinfo_by_id, array_class, median_core, group_map=nil)
click to toggle source
# File lib/pwrake/queue/locality_aware_queue.rb, line 5 def initialize(hostinfo_by_id, array_class, median_core, group_map=nil) @hostinfo_by_id = hostinfo_by_id @array_class = array_class @median_core = median_core # group_map = {gid1=>[hid1,hid2,...], ...} @total_core = 0 @q = {} @hostinfo_by_id.each do |id,host_info| @total_core += c = host_info.ncore @q[id] = @array_class.new(c) end @q_group = {} group_map ||= {1=>@hostinfo_by_id.map{|id,h| id}} group_map.each do |gid,ary| q1 = {} # same group q2 = @q.dup # other groups ary.each{|hid| q1[hid] = q2.delete(hid)} a = [q1,q2] ary.each{|hid| @q_group[hid] = a} end @q_remote = @array_class.new(@total_core) @q_all = @array_class.new(@total_core) @disable_steal = Rake.application.pwrake_options['DISABLE_STEAL'] Log.debug "#{self.class}: @disable_steal=#{@disable_steal.inspect}" @disable_rank = Rake.application.pwrake_options['DISABLE_RANK_PRIORITY'] Log.debug "#{self.class}: @disable_rank=#{@disable_rank.inspect}" @turns = @disable_steal ? [0] : [0,1] @last_enq_time = Time.now end
Public Instance Methods
clear()
click to toggle source
# File lib/pwrake/queue/locality_aware_queue.rb, line 165 def clear @q.each{|h,q| q.clear} @q_remote.clear @q_all.clear end
deq_impl(host_info, turn)
click to toggle source
# File lib/pwrake/queue/locality_aware_queue.rb, line 85 def deq_impl(host_info, turn) case turn when 0 deq_local(host_info) || deq_remote(host_info) when 1 deq_steal(host_info) end end
deq_local(run_host)
click to toggle source
# File lib/pwrake/queue/locality_aware_queue.rb, line 95 def deq_local(run_host) q = @q[run_host.id] if q && !q.empty? t = q.shift(run_host,@rank) if t q_delete_assigned_to(t) @q_all.delete(t) Log.debug "deq_local task=#{t&&t.name} host=#{run_host.name} req_rank=#{@rank}" return t end end nil end
deq_remote(host_info)
click to toggle source
# File lib/pwrake/queue/locality_aware_queue.rb, line 109 def deq_remote(host_info) if t = @q_remote.shift(host_info,@rank) @q_all.delete(t) Log.debug "deq_remote task=#{t&&t.name} host=#{host_info.name} req_rank=#{@rank}" return t end nil end
deq_start()
click to toggle source
# File lib/pwrake/queue/locality_aware_queue.rb, line 81 def deq_start @rank = @disable_rank ? 0 : @q_all.find_rank(@median_core) end
deq_steal(run_host)
click to toggle source
# File lib/pwrake/queue/locality_aware_queue.rb, line 118 def deq_steal(run_host) if t = @q_all.shift(run_host,@rank) q_delete_assigned_to(t) @q_remote.delete(t) Log.debug "deq_steal task=#{t&&t.name} host=#{run_host.name} req_rank=#{@rank}" return t end nil end
drop_host(host_info)
click to toggle source
# File lib/pwrake/queue/locality_aware_queue.rb, line 175 def drop_host(host_info) hid = host_info.id if q_drop = @q.delete(hid) n_move = 0 q_size = q_drop.size while t = q_drop.shift(host_info,@rank) assigned_other = false t.assigned.each do |h| if h != hid && @q[h] assigned_other = true break end end if !assigned_other @q_remote.push(t) n_move += 1 end end Log.debug "LAQ#drop_host: host=#{host_info.name} q.size=#{q_size} n_move=#{n_move}" end end
empty?()
click to toggle source
# File lib/pwrake/queue/locality_aware_queue.rb, line 171 def empty? @q_all.empty? end
enq_impl(t)
click to toggle source
# File lib/pwrake/queue/locality_aware_queue.rb, line 41 def enq_impl(t) hints = t && t.suggest_location Log.debug "enq #{t.name} hints=#{hints.inspect}" @q_all.push(t) if hints.nil? || hints.empty? @q_remote.push(t) else kv = {} hints.each do |h| HostMap.ipmatch_for_name(h).each{|id| kv[id] = true} end q_success = false if !kv.empty? kv.each_key do |id| q = @q[id] if q q.push(t) q_success = true t.assigned.push(id) else Log.warn("lost queue for host id=#{id.inspect}: @q.keys=#{@q.keys.inspect}") end end end unless q_success @q_remote.push(t) end end @last_enq_time = Time.now end
inspect_q()
click to toggle source
# File lib/pwrake/queue/locality_aware_queue.rb, line 136 def inspect_q s = "" if @q_all.size == @q_remote.size n = @q.size else n = 0 @q.each do |h,q| if q.size > 0 hinfo = @hostinfo_by_id[h] if hinfo s << TaskQueue._qstr(hinfo.name,q) else s << TaskQueue._qstr("(#{hinfo.inspect})",q) end else n += 1 end end end s << TaskQueue._qstr("local*#{n}",[]) if n > 0 s << TaskQueue._qstr("remote",@q_remote) s << TaskQueue._qstr("all",@q_all) s end
q_delete_assigned_to(t)
click to toggle source
# File lib/pwrake/queue/locality_aware_queue.rb, line 128 def q_delete_assigned_to(t) t.assigned.each do |h| if q_h = @q[h] q_h.delete(t) end end end
size()
click to toggle source
# File lib/pwrake/queue/locality_aware_queue.rb, line 161 def size @q_all.size end
turn_empty?(turn)
click to toggle source
# File lib/pwrake/queue/locality_aware_queue.rb, line 72 def turn_empty?(turn) case turn when 0,2 empty? when 1,3 @q_all.size == @q_remote.size end end