class Que::JobBuffer

Attributes

maximum_size[R]
priority_queues[R]

Public Class Methods

new( maximum_size:, priorities: ) click to toggle source

Since we use a mutex, which is not reentrant, we have to be a little careful to not call a method that locks the mutex when we’ve already locked it. So, as a general rule, public methods handle locking the mutex when necessary, while private methods handle the actual underlying data changes. This lets us reuse those private methods without running into locking issues.

# File lib/que/job_buffer.rb, line 18
def initialize(
  maximum_size:,
  priorities:
)
  @maximum_size = Que.assert(Integer, maximum_size)
  Que.assert(maximum_size >= 0) { "maximum_size for a JobBuffer must be at least zero!" }

  @stop  = false
  @array = []
  @mutex = Mutex.new

  @priority_queues = Hash[
    # Make sure that priority = nil sorts highest.
    priorities.sort_by{|p| p || MAXIMUM_PRIORITY}.map do |p|
      [p, PriorityQueue.new(priority: p, job_buffer: self)]
    end
  ].freeze
end

Public Instance Methods

accept?(metajobs) click to toggle source
# File lib/que/job_buffer.rb, line 74
def accept?(metajobs)
  metajobs.sort!

  sync do
    return [] if _stopping?

    start_index = _buffer_space
    final_index = metajobs.length - 1

    return metajobs if start_index > final_index
    index_to_lose = @array.length - 1

    start_index.upto(final_index) do |index|
      if index_to_lose >= 0 && (metajobs[index] <=> @array[index_to_lose]) < 0
        return metajobs if index == final_index
        index_to_lose -= 1
      else
        return metajobs.slice(0...index)
      end
    end

    []
  end
end
available_priorities() click to toggle source
# File lib/que/job_buffer.rb, line 107
def available_priorities
  hash = {}
  lowest_priority = true

  priority_queues.reverse_each do |priority, pq|
    count = pq.waiting_count

    if lowest_priority
      count += buffer_space
      lowest_priority = false
    end

    hash[priority || MAXIMUM_PRIORITY] = count if count > 0
  end

  hash
end
buffer_space() click to toggle source
# File lib/que/job_buffer.rb, line 125
def buffer_space
  sync { _buffer_space }
end
clear() click to toggle source
# File lib/que/job_buffer.rb, line 142
def clear
  sync { pop(_size) }
end
job_available?(priority) click to toggle source
# File lib/que/job_buffer.rb, line 150
def job_available?(priority)
  (job = @array.first) && job.priority_sufficient?(priority)
end
push(*metajobs) click to toggle source
# File lib/que/job_buffer.rb, line 37
def push(*metajobs)
  Que.internal_log(:job_buffer_push, self) do
    {
      maximum_size:  maximum_size,
      ids:           metajobs.map(&:id),
      current_queue: to_a,
    }
  end

  sync do
    return metajobs if _stopping?

    @array.concat(metajobs).sort!

    # Relying on the hash's contents being sorted, here.
    priority_queues.reverse_each do |_, pq|
      pq.populate do
        _shift_job(pq.priority)
      end
    end

    # If we passed the maximum buffer size, drop the lowest sort keys and
    # return their ids to be unlocked.
    overage = -_buffer_space
    pop(overage) if overage > 0
  end
end
shift(priority = nil) click to toggle source
# File lib/que/job_buffer.rb, line 65
def shift(priority = nil)
  queue = priority_queues.fetch(priority) { raise Error, "not a permitted priority! #{priority}" }
  queue.pop || shift_job(priority)
end
shift_job(priority = nil) click to toggle source
# File lib/que/job_buffer.rb, line 70
def shift_job(priority = nil)
  sync { _shift_job(priority) }
end
size() click to toggle source
# File lib/que/job_buffer.rb, line 129
def size
  sync { _size }
end
stop() click to toggle source
# File lib/que/job_buffer.rb, line 137
def stop
  sync { @stop = true }
  priority_queues.each_value(&:stop)
end
stopping?() click to toggle source
# File lib/que/job_buffer.rb, line 146
def stopping?
  sync { _stopping? }
end
to_a() click to toggle source
# File lib/que/job_buffer.rb, line 133
def to_a
  sync { @array.dup }
end
waiting_count() click to toggle source
# File lib/que/job_buffer.rb, line 99
def waiting_count
  count = 0
  priority_queues.each_value do |pq|
    count += pq.waiting_count
  end
  count
end

Private Instance Methods

_buffer_space() click to toggle source
# File lib/que/job_buffer.rb, line 156
def _buffer_space
  maximum_size - _size
end
_shift_job(priority) click to toggle source
# File lib/que/job_buffer.rb, line 164
def _shift_job(priority)
  if _stopping?
    false
  elsif (job = @array.first) && job.priority_sufficient?(priority)
    @array.shift
  end
end
_size() click to toggle source
# File lib/que/job_buffer.rb, line 172
def _size
  @array.size
end
_stopping?() click to toggle source
# File lib/que/job_buffer.rb, line 176
def _stopping?
  !!@stop
end
pop(count) click to toggle source
# File lib/que/job_buffer.rb, line 160
def pop(count)
  @array.pop(count)
end
sync(&block) click to toggle source
# File lib/que/job_buffer.rb, line 180
def sync(&block)
  @mutex.synchronize(&block)
end