module Postjob::Queue::Notifications
The Postjob::Queue
manages enqueueing and fetching jobs from a job queue.
Constants
- CHANNEL
- MAX_WAIT_TIME
- SCHEMA_NAME
- SQL
Public Instance Methods
wait_for_new_job(worker_session_id, queues:)
click to toggle source
# File lib/postjob/queue/notifications.rb, line 12 def wait_for_new_job(worker_session_id, queues:) started_at = Time.now start_listening # Determine when the next job is up. If we don't have a next job within MAX_WAIT_TIME # we wake up regardless. wait_time = time_to_next_job(worker_session_id, queues: queues) return if wait_time && wait_time <= 0 if !wait_time && ::Postjob::Queue.should_shutdown?(worker_session_id) Postjob.logger.debug "Shutting down runner: host is set to 'shutdown'" return :shutdown end wait_time = MAX_WAIT_TIME if !wait_time || wait_time > MAX_WAIT_TIME Postjob.logger.debug "postjob: waiting for notification for up to #{wait_time} seconds" Simple::SQL.wait_for_notify(wait_time) # flush notifications. It is possible that a huge number of notifications # piled up while we have been waiting. The following line takes care of # those. while Simple::SQL.wait_for_notify(0.000001) :nop end Postjob.logger.debug "postjob: awoke after #{format('%.03f secs', (Time.now - started_at))}" rescue Interrupt Postjob.logger.info "postjob: shutdown after receiving Interrupt" :shutdown end
Private Instance Methods
start_listening()
click to toggle source
# File lib/postjob/queue/notifications.rb, line 46 def start_listening return if @is_listening Simple::SQL.ask "LISTEN #{CHANNEL}" @is_listening = true end
time_to_next_job(worker_session_id, queues:)
click to toggle source
returns the maximum number of seconds to wait until the next runnable or timeoutable job comes up.
# File lib/postjob/queue/notifications.rb, line 55 def time_to_next_job(worker_session_id, queues:) expect! worker_session_id => /[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/i expect! queues => Array Simple::SQL.ask "SELECT * FROM #{SCHEMA_NAME}.time_to_next_job($1::uuid, $2)", worker_session_id, queues end