class Que::JobBuffer::PriorityQueue
A queue object dedicated to a specific worker priority. It’s basically a Queue object from the standard library, but it’s able to reach into the JobBuffer’s buffer in order to satisfy a pop.
Attributes
job_buffer[R]
mutex[R]
priority[R]
Public Class Methods
new( job_buffer:, priority: )
click to toggle source
# File lib/que/job_buffer.rb, line 190 def initialize( job_buffer:, priority: ) @job_buffer = job_buffer @priority = priority @waiting = 0 @stopping = false @items = [] # Items pending distribution to waiting threads. @mutex = Mutex.new @cv = ConditionVariable.new end
Public Instance Methods
pop()
click to toggle source
# File lib/que/job_buffer.rb, line 203 def pop sync do loop do if @stopping return false elsif item = @items.pop return item elsif job_buffer.job_available?(priority) return false end @waiting += 1 @cv.wait(mutex) @waiting -= 1 end end end
populate() { || ... }
click to toggle source
# File lib/que/job_buffer.rb, line 228 def populate sync do waiting_count.times do job = yield break if job.nil? # False would mean we're stopping. _push(job) end end end
stop()
click to toggle source
# File lib/que/job_buffer.rb, line 221 def stop sync do @stopping = true @cv.broadcast end end
waiting_count()
click to toggle source
# File lib/que/job_buffer.rb, line 238 def waiting_count @waiting end
Private Instance Methods
_push(item)
click to toggle source
# File lib/que/job_buffer.rb, line 248 def _push(item) Que.assert(waiting_count > 0) @items << item @cv.signal end
sync(&block)
click to toggle source
# File lib/que/job_buffer.rb, line 244 def sync(&block) mutex.synchronize(&block) end