class Que::Poller

Attributes

connection[R]
last_poll_satisfied[R]
last_polled_at[R]
poll_interval[R]
queue[R]

Public Class Methods

cleanup(connection) click to toggle source
# File lib/que/poller.rb, line 258
      def cleanup(connection)
        connection.execute <<-SQL
          DROP FUNCTION pg_temp.que_highest_remaining_priority(jsonb);
          DROP FUNCTION pg_temp.lock_and_update_priorities(jsonb, que_jobs);
          DROP TYPE pg_temp.que_query_result;
        SQL
      end
new( connection:, queue:, poll_interval: ) click to toggle source
# File lib/que/poller.rb, line 122
def initialize(
  connection:,
  queue:,
  poll_interval:
)
  @connection          = connection
  @queue               = queue
  @poll_interval       = poll_interval
  @last_polled_at      = nil
  @last_poll_satisfied = nil

  Que.internal_log :poller_instantiate, self do
    {
      backend_pid:   connection.backend_pid,
      queue:         queue,
      poll_interval: poll_interval,
    }
  end
end
setup(connection) click to toggle source

Manage some temporary infrastructure (specific to the connection) that we’ll use for polling. These could easily be created permanently in a migration, but that’d require another migration if we wanted to tweak them later.

# File lib/que/poller.rb, line 194
      def setup(connection)
        connection.execute <<-SQL
          -- Temporary composite type we need for our queries to work.
          CREATE TYPE pg_temp.que_query_result AS (
            locked boolean,
            remaining_priorities jsonb
          );

          CREATE FUNCTION pg_temp.lock_and_update_priorities(priorities jsonb, job que_jobs)
          RETURNS pg_temp.que_query_result
          AS $$
            WITH
              -- Take the lock in a CTE because we want to use the result
              -- multiple times while only taking the lock once.
              lock_taken AS (
                SELECT pg_try_advisory_lock((job).id) AS taken
              ),
              relevant AS (
                SELECT priority, count
                FROM (
                  SELECT
                    key::smallint AS priority,
                    value::text::integer AS count
                  FROM jsonb_each(priorities)
                  ) t1
                WHERE priority >= (job).priority
                ORDER BY priority ASC
                LIMIT 1
              )
            SELECT
              (SELECT taken FROM lock_taken), -- R
              CASE (SELECT taken FROM lock_taken)
              WHEN false THEN
                -- Simple case - we couldn't lock the job, so don't update the
                -- priorities hash.
                priorities
              WHEN true THEN
                CASE count
                WHEN 1 THEN
                  -- Remove the priority from the JSONB doc entirely, rather
                  -- than leaving a zero entry in it.
                  priorities - priority::text
                ELSE
                  -- Decrement the value in the JSONB doc.
                  jsonb_set(
                    priorities,
                    ARRAY[priority::text],
                    to_jsonb(count - 1)
                  )
                END
              END
            FROM relevant
          $$
          STABLE
          LANGUAGE SQL;

          CREATE FUNCTION pg_temp.que_highest_remaining_priority(priorities jsonb) RETURNS smallint AS $$
            SELECT max(key::smallint) FROM jsonb_each(priorities)
          $$
          STABLE
          LANGUAGE SQL;
        SQL
      end

Public Instance Methods

poll( priorities:, held_locks: ) click to toggle source
# File lib/que/poller.rb, line 142
def poll(
  priorities:,
  held_locks:
)

  return unless should_poll?

  jobs =
    connection.execute_prepared(
      :poll_jobs,
      [
        @queue,
        "{#{held_locks.to_a.join(',')}}",
        JSON.dump(priorities),
      ]
    )

  @last_polled_at      = Time.now
  @last_poll_satisfied = poll_satisfied?(priorities, jobs)

  Que.internal_log :poller_polled, self do
    {
      queue:        @queue,
      locked:       jobs.count,
      priorities:   priorities,
      held_locks:   held_locks.to_a,
      newly_locked: jobs.map { |key| key.fetch(:id) },
    }
  end

  jobs.map! { |job| Metajob.new(job) }
end
poll_interval_elapsed?() click to toggle source
# File lib/que/poller.rb, line 183
def poll_interval_elapsed?
  return unless interval = poll_interval
  (Time.now - last_polled_at) > interval
end
should_poll?() click to toggle source
# File lib/que/poller.rb, line 175
def should_poll?
  # Never polled before?
  last_poll_satisfied.nil? ||
  # Plenty of jobs were available last time?
  last_poll_satisfied == true ||
  poll_interval_elapsed?
end

Private Instance Methods

poll_satisfied?(priorities, jobs) click to toggle source
# File lib/que/poller.rb, line 269
def poll_satisfied?(priorities, jobs)
  lowest_priority = priorities.keys.max
  jobs.count >= priorities[lowest_priority]
end