class Que::JobBuffer
Attributes
maximum_size[R]
priority_queues[R]
Public Class Methods
new( maximum_size:, priorities: )
click to toggle source
Since we use a mutex, which is not reentrant, we have to be a little careful to not call a method that locks the mutex when we’ve already locked it. So, as a general rule, public methods handle locking the mutex when necessary, while private methods handle the actual underlying data changes. This lets us reuse those private methods without running into locking issues.
# File lib/que/job_buffer.rb, line 18 def initialize( maximum_size:, priorities: ) @maximum_size = Que.assert(Integer, maximum_size) Que.assert(maximum_size >= 0) { "maximum_size for a JobBuffer must be at least zero!" } @stop = false @array = [] @mutex = Mutex.new @priority_queues = Hash[ # Make sure that priority = nil sorts highest. priorities.sort_by{|p| p || MAXIMUM_PRIORITY}.map do |p| [p, PriorityQueue.new(priority: p, job_buffer: self)] end ].freeze end
Public Instance Methods
accept?(metajobs)
click to toggle source
# File lib/que/job_buffer.rb, line 74 def accept?(metajobs) metajobs.sort! sync do return [] if _stopping? start_index = _buffer_space final_index = metajobs.length - 1 return metajobs if start_index > final_index index_to_lose = @array.length - 1 start_index.upto(final_index) do |index| if index_to_lose >= 0 && (metajobs[index] <=> @array[index_to_lose]) < 0 return metajobs if index == final_index index_to_lose -= 1 else return metajobs.slice(0...index) end end [] end end
available_priorities()
click to toggle source
# File lib/que/job_buffer.rb, line 107 def available_priorities hash = {} lowest_priority = true priority_queues.reverse_each do |priority, pq| count = pq.waiting_count if lowest_priority count += buffer_space lowest_priority = false end hash[priority || MAXIMUM_PRIORITY] = count if count > 0 end hash end
buffer_space()
click to toggle source
# File lib/que/job_buffer.rb, line 125 def buffer_space sync { _buffer_space } end
clear()
click to toggle source
# File lib/que/job_buffer.rb, line 142 def clear sync { pop(_size) } end
job_available?(priority)
click to toggle source
# File lib/que/job_buffer.rb, line 150 def job_available?(priority) (job = @array.first) && job.priority_sufficient?(priority) end
push(*metajobs)
click to toggle source
# File lib/que/job_buffer.rb, line 37 def push(*metajobs) Que.internal_log(:job_buffer_push, self) do { maximum_size: maximum_size, ids: metajobs.map(&:id), current_queue: to_a, } end sync do return metajobs if _stopping? @array.concat(metajobs).sort! # Relying on the hash's contents being sorted, here. priority_queues.reverse_each do |_, pq| pq.populate do _shift_job(pq.priority) end end # If we passed the maximum buffer size, drop the lowest sort keys and # return their ids to be unlocked. overage = -_buffer_space pop(overage) if overage > 0 end end
shift(priority = nil)
click to toggle source
# File lib/que/job_buffer.rb, line 65 def shift(priority = nil) queue = priority_queues.fetch(priority) { raise Error, "not a permitted priority! #{priority}" } queue.pop || shift_job(priority) end
shift_job(priority = nil)
click to toggle source
# File lib/que/job_buffer.rb, line 70 def shift_job(priority = nil) sync { _shift_job(priority) } end
size()
click to toggle source
# File lib/que/job_buffer.rb, line 129 def size sync { _size } end
stop()
click to toggle source
# File lib/que/job_buffer.rb, line 137 def stop sync { @stop = true } priority_queues.each_value(&:stop) end
stopping?()
click to toggle source
# File lib/que/job_buffer.rb, line 146 def stopping? sync { _stopping? } end
to_a()
click to toggle source
# File lib/que/job_buffer.rb, line 133 def to_a sync { @array.dup } end
waiting_count()
click to toggle source
# File lib/que/job_buffer.rb, line 99 def waiting_count count = 0 priority_queues.each_value do |pq| count += pq.waiting_count end count end
Private Instance Methods
_buffer_space()
click to toggle source
# File lib/que/job_buffer.rb, line 156 def _buffer_space maximum_size - _size end
_shift_job(priority)
click to toggle source
# File lib/que/job_buffer.rb, line 164 def _shift_job(priority) if _stopping? false elsif (job = @array.first) && job.priority_sufficient?(priority) @array.shift end end
_size()
click to toggle source
# File lib/que/job_buffer.rb, line 172 def _size @array.size end
_stopping?()
click to toggle source
# File lib/que/job_buffer.rb, line 176 def _stopping? !!@stop end
pop(count)
click to toggle source
# File lib/que/job_buffer.rb, line 160 def pop(count) @array.pop(count) end
sync(&block)
click to toggle source
# File lib/que/job_buffer.rb, line 180 def sync(&block) @mutex.synchronize(&block) end