module Postjob::Queue::Notifications

The Postjob::Queue manages enqueueing and fetching jobs from a job queue.

Constants

CHANNEL
MAX_WAIT_TIME
SCHEMA_NAME
SQL

Public Instance Methods

wait_for_new_job(worker_session_id, queues:) click to toggle source
# File lib/postjob/queue/notifications.rb, line 12
def wait_for_new_job(worker_session_id, queues:)
  started_at = Time.now

  start_listening

  # Determine when the next job is up. If we don't have a next job within MAX_WAIT_TIME
  # we wake up regardless.
  wait_time = time_to_next_job(worker_session_id, queues: queues)
  return if wait_time && wait_time <= 0

  if !wait_time && ::Postjob::Queue.should_shutdown?(worker_session_id)
    Postjob.logger.debug "Shutting down runner: host is set to 'shutdown'"
    return :shutdown
  end

  wait_time = MAX_WAIT_TIME if !wait_time || wait_time > MAX_WAIT_TIME
  Postjob.logger.debug "postjob: waiting for notification for up to #{wait_time} seconds"
  Simple::SQL.wait_for_notify(wait_time)

  # flush notifications. It is possible that a huge number of notifications
  # piled up while we have been waiting. The following line takes care of
  # those.
  while Simple::SQL.wait_for_notify(0.000001)
    :nop
  end

  Postjob.logger.debug "postjob: awoke after #{format('%.03f secs', (Time.now - started_at))}"
rescue Interrupt
  Postjob.logger.info "postjob: shutdown after receiving Interrupt"
  :shutdown
end

Private Instance Methods

start_listening() click to toggle source
# File lib/postjob/queue/notifications.rb, line 46
def start_listening
  return if @is_listening

  Simple::SQL.ask "LISTEN #{CHANNEL}"
  @is_listening = true
end
time_to_next_job(worker_session_id, queues:) click to toggle source

returns the maximum number of seconds to wait until the next runnable or timeoutable job comes up.

# File lib/postjob/queue/notifications.rb, line 55
def time_to_next_job(worker_session_id, queues:)
  expect! worker_session_id => /[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}/i
  expect! queues => Array

  Simple::SQL.ask "SELECT * FROM #{SCHEMA_NAME}.time_to_next_job($1::uuid, $2)", worker_session_id, queues
end