class Que::Poller
Attributes
connection[R]
last_poll_satisfied[R]
last_polled_at[R]
poll_interval[R]
queue[R]
Public Class Methods
cleanup(connection)
click to toggle source
# File lib/que/poller.rb, line 258 def cleanup(connection) connection.execute <<-SQL DROP FUNCTION pg_temp.que_highest_remaining_priority(jsonb); DROP FUNCTION pg_temp.lock_and_update_priorities(jsonb, que_jobs); DROP TYPE pg_temp.que_query_result; SQL end
new( connection:, queue:, poll_interval: )
click to toggle source
# File lib/que/poller.rb, line 122 def initialize( connection:, queue:, poll_interval: ) @connection = connection @queue = queue @poll_interval = poll_interval @last_polled_at = nil @last_poll_satisfied = nil Que.internal_log :poller_instantiate, self do { backend_pid: connection.backend_pid, queue: queue, poll_interval: poll_interval, } end end
setup(connection)
click to toggle source
Manage some temporary infrastructure (specific to the connection) that we’ll use for polling. These could easily be created permanently in a migration, but that’d require another migration if we wanted to tweak them later.
# File lib/que/poller.rb, line 194 def setup(connection) connection.execute <<-SQL -- Temporary composite type we need for our queries to work. CREATE TYPE pg_temp.que_query_result AS ( locked boolean, remaining_priorities jsonb ); CREATE FUNCTION pg_temp.lock_and_update_priorities(priorities jsonb, job que_jobs) RETURNS pg_temp.que_query_result AS $$ WITH -- Take the lock in a CTE because we want to use the result -- multiple times while only taking the lock once. lock_taken AS ( SELECT pg_try_advisory_lock((job).id) AS taken ), relevant AS ( SELECT priority, count FROM ( SELECT key::smallint AS priority, value::text::integer AS count FROM jsonb_each(priorities) ) t1 WHERE priority >= (job).priority ORDER BY priority ASC LIMIT 1 ) SELECT (SELECT taken FROM lock_taken), -- R CASE (SELECT taken FROM lock_taken) WHEN false THEN -- Simple case - we couldn't lock the job, so don't update the -- priorities hash. priorities WHEN true THEN CASE count WHEN 1 THEN -- Remove the priority from the JSONB doc entirely, rather -- than leaving a zero entry in it. priorities - priority::text ELSE -- Decrement the value in the JSONB doc. jsonb_set( priorities, ARRAY[priority::text], to_jsonb(count - 1) ) END END FROM relevant $$ STABLE LANGUAGE SQL; CREATE FUNCTION pg_temp.que_highest_remaining_priority(priorities jsonb) RETURNS smallint AS $$ SELECT max(key::smallint) FROM jsonb_each(priorities) $$ STABLE LANGUAGE SQL; SQL end
Public Instance Methods
poll( priorities:, held_locks: )
click to toggle source
# File lib/que/poller.rb, line 142 def poll( priorities:, held_locks: ) return unless should_poll? jobs = connection.execute_prepared( :poll_jobs, [ @queue, "{#{held_locks.to_a.join(',')}}", JSON.dump(priorities), ] ) @last_polled_at = Time.now @last_poll_satisfied = poll_satisfied?(priorities, jobs) Que.internal_log :poller_polled, self do { queue: @queue, locked: jobs.count, priorities: priorities, held_locks: held_locks.to_a, newly_locked: jobs.map { |key| key.fetch(:id) }, } end jobs.map! { |job| Metajob.new(job) } end
poll_interval_elapsed?()
click to toggle source
# File lib/que/poller.rb, line 183 def poll_interval_elapsed? return unless interval = poll_interval (Time.now - last_polled_at) > interval end
should_poll?()
click to toggle source
# File lib/que/poller.rb, line 175 def should_poll? # Never polled before? last_poll_satisfied.nil? || # Plenty of jobs were available last time? last_poll_satisfied == true || poll_interval_elapsed? end
Private Instance Methods
poll_satisfied?(priorities, jobs)
click to toggle source
# File lib/que/poller.rb, line 269 def poll_satisfied?(priorities, jobs) lowest_priority = priorities.keys.max jobs.count >= priorities[lowest_priority] end