class Que::JobBuffer::PriorityQueue

A queue object dedicated to a specific worker priority. It’s basically a Queue object from the standard library, but it’s able to reach into the JobBuffer’s buffer in order to satisfy a pop.

Attributes

job_buffer[R]
mutex[R]
priority[R]

Public Class Methods

new( job_buffer:, priority: ) click to toggle source
# File lib/que/job_buffer.rb, line 190
def initialize(
  job_buffer:,
  priority:
)
  @job_buffer = job_buffer
  @priority   = priority
  @waiting    = 0
  @stopping   = false
  @items      = [] # Items pending distribution to waiting threads.
  @mutex      = Mutex.new
  @cv         = ConditionVariable.new
end

Public Instance Methods

pop() click to toggle source
# File lib/que/job_buffer.rb, line 203
def pop
  sync do
    loop do
      if @stopping
        return false
      elsif item = @items.pop
        return item
      elsif job_buffer.job_available?(priority)
        return false
      end

      @waiting += 1
      @cv.wait(mutex)
      @waiting -= 1
    end
  end
end
populate() { || ... } click to toggle source
# File lib/que/job_buffer.rb, line 228
def populate
  sync do
    waiting_count.times do
      job = yield
      break if job.nil? # False would mean we're stopping.
      _push(job)
    end
  end
end
stop() click to toggle source
# File lib/que/job_buffer.rb, line 221
def stop
  sync do
    @stopping = true
    @cv.broadcast
  end
end
waiting_count() click to toggle source
# File lib/que/job_buffer.rb, line 238
def waiting_count
  @waiting
end

Private Instance Methods

_push(item) click to toggle source
# File lib/que/job_buffer.rb, line 248
def _push(item)
  Que.assert(waiting_count > 0)
  @items << item
  @cv.signal
end
sync(&block) click to toggle source
# File lib/que/job_buffer.rb, line 244
def sync(&block)
  mutex.synchronize(&block)
end