module Postjob::Queue

The Postjob::Queue module manages enqueueing and fetching jobs from a job queue.

Constants

SCHEMA_NAME
SQL
UUID_REGEXP
WorkerSession

starts a session

Public Instance Methods

checkout(worker_session_id, queues:) click to toggle source
# File lib/postjob/queue.rb, line 176
def checkout(worker_session_id, queues:)
  expect! worker_session_id => UUID_REGEXP
  expect! queues => [ nil, Array ]

  SQL.ask "SELECT * FROM #{SCHEMA_NAME}.checkout($1::uuid, $2::boolean, $3)",
    worker_session_id, Postjob.fast_mode, queues, into: Job
end
childjobs(parent) click to toggle source
# File lib/postjob/queue.rb, line 119
def childjobs(parent)
  expect! parent => Job
  SQL.all "SELECT * FROM #{SCHEMA_NAME}.childjobs($1)", parent.id, into: Job
end
disable_cron_jobs(workflow, args) click to toggle source

Disable cron'ness for workflow + args combination

# File lib/postjob/queue.rb, line 79
  def disable_cron_jobs(workflow, args)
    sql = <<~SQL
      UPDATE postjob.postjobs
      SET cron_interval = NULL
      WHERE cron_interval IS NOT NULL
        AND workflow=$1
        AND args=$2
        AND parent_id IS NULL
        AND status NOT IN ('ok', 'failed', 'timeout')
      SQL

    Simple::SQL.ask sql, workflow, Encoder.encode(args)
  end
enqueue_job(worker_session_id, workflow, *args, options) click to toggle source

enqueues a new job with the given arguments

Parameters:

  • queue - the name of the queue

  • workflow - the name of the workflow (e.g. “FooBar”, “FooBar#method_name”)

  • version - the version of the workflow, e.g. “0.2”

  • args - an array of arguments, must be encodable via Postjob::JSON.encode

  • parent_id - the id of the parent job, if any

  • tags - # a Hash[String => String]

# File lib/postjob/queue.rb, line 36
def enqueue_job(worker_session_id, workflow, *args, options)
  expect! workflow => String
  expect! options => {
    queue: [String, nil],
    version: [/\A\d(\.\d)+\z/, nil],
    parent_id: [Integer, nil],
    tags: [Hash, nil],
    timeout: [Numeric, nil],
    max_attempts: [Integer, nil],
    sticky: [true, false, nil],
    greedy: [true, false, nil]
  }

  workflow, workflow_method = parse_workflow(workflow)

  if options[:greedy] && !options[:sticky]
    raise ArgumentError, "#{workflow}: A greedy job must also be sticky" unless options[:sticky].nil?
    options[:sticky] = true if options[:greedy]
  end

  # The use of a `SELECT * FROM function()` here is due to
  #
  # a) a limitation in Simple::SQL which would not be able to unpack a
  #    "SELECT function()" usefully when the return value is a record;
  # b) and/or my inability to write better SQL functions;
  SQL.ask "SELECT * FROM #{SCHEMA_NAME}.enqueue($1::uuid, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
                worker_session_id,
                options[:queue],
                workflow,
                workflow_method,
                options[:version],
                Encoder.encode(args),
                options[:parent_id],
                Encoder.encode(options[:tags]),
                options[:max_attempts],
                options[:timeout],
                options[:cron_interval],
                options[:sticky],
                options[:greedy],
                into: Job
end
find_job_by_token(token) click to toggle source
# File lib/postjob/queue.rb, line 188
def find_job_by_token(token)
  SQL.ask "SELECT * FROM #{SCHEMA_NAME}.postjobs_by_token($1)", token, into: Job
end
find_or_create_childjob(worker_session_id, parent, workflow, args, timeout:, max_attempts:, queue: nil) click to toggle source
# File lib/postjob/queue.rb, line 129
def find_or_create_childjob(worker_session_id, parent, workflow, args, timeout:, max_attempts:, queue: nil)
  expect! parent => Job
  expect! workflow => String
  expect! args => Array

  if workflow == "__manual__" && max_attempts != 1
    Postjob.logger.info "Job ##{parent.id} adjusting max_attempts of '__manual__' child job"
    max_attempts = 1
  end

  workflow, workflow_method = parse_workflow(workflow)

  # The use of a `SELECT * FROM function()` here is due to
  #
  # a) a limitation in Simple::SQL which would not be able to unpack a
  #    "SELECT function()" usefully when the return value is a record;
  # b) and/or my inability to write better SQL functions;
  SQL.ask "SELECT * FROM #{SCHEMA_NAME}.find_or_create_childjob($1::uuid, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
         worker_session_id,
         queue,
         workflow,
         workflow_method,
         nil,                      # version
         Encoder.encode(args),
         parent.id,
         nil,                      # tags will be read from parent
         max_attempts,
         timeout,
         into: Job
end
find_or_create_token(job) click to toggle source
# File lib/postjob/queue.rb, line 184
def find_or_create_token(job)
  SQL.ask "SELECT #{SCHEMA_NAME}.find_or_create_token($1)", job.id
end
host_heartbeat(host_id, measurement) click to toggle source

sends in a heartbeat

# File lib/postjob/queue.rb, line 203
def host_heartbeat(host_id, measurement)
  Simple::SQL.ask "SELECT postjob.host_heartbeat($1::uuid, $2::jsonb, $3)",
    host_id, JSON.generate(measurement), ::Postjob.fast_mode
end
host_register(attributes, host_id:) click to toggle source

returns the host id

# File lib/postjob/queue.rb, line 195
def host_register(attributes, host_id:)
  expect! attributes => [ nil, Hash ]
  expect! host_id => [ nil, UUID_REGEXP ]

  Simple::SQL.ask "SELECT postjob.host_register($1, $2::uuid)", JSON.generate(attributes), host_id
end
set_job_error(worker_session_id, job, error, error_message, error_backtrace = nil, status:, version:) click to toggle source
# File lib/postjob/queue.rb, line 111
def set_job_error(worker_session_id, job, error, error_message, error_backtrace = nil, status:, version:)
  expect! status => [ :failed, :err, :timeout ]
  expect! worker_session_id => UUID_REGEXP

  SQL.ask "SELECT #{SCHEMA_NAME}.set_job_error($1::uuid, $2, $3, $4, $5, $6, $7, $8)",
    worker_session_id, job.id, error, error_message, Encoder.encode(error_backtrace), status, version, Postjob.fast_mode
end
set_job_pending(worker_session_id, job, version:) click to toggle source
# File lib/postjob/queue.rb, line 105
def set_job_pending(worker_session_id, job, version:)
  expect! worker_session_id => UUID_REGEXP

  SQL.ask "SELECT #{SCHEMA_NAME}.set_job_pending($1::uuid, $2, $3)", worker_session_id, job.id, version
end
set_job_result(worker_session_id, job, value, version:) click to toggle source
# File lib/postjob/queue.rb, line 98
def set_job_result(worker_session_id, job, value, version:)
  expect! worker_session_id => UUID_REGEXP

  value = Encoder.encode([value]) unless value.nil?
  SQL.ask "SELECT #{SCHEMA_NAME}.set_job_result($1::uuid, $2, $3, $4)", worker_session_id, job.id, value, version
end
settings?() click to toggle source
# File lib/postjob/queue/settings.rb, line 2
def settings?
  tables = SQL::Reflection.tables(schema: "postjob")
  tables.include?("postjob.settings")
end
should_shutdown?(worker_session_id) click to toggle source

Asks the database whether this session should be shut down.

# File lib/postjob/queue.rb, line 94
def should_shutdown?(worker_session_id)
  SQL.ask "SELECT #{SCHEMA_NAME}.session_should_shutdown($1::uuid)", worker_session_id
end
unresolved_childjobs(parent) click to toggle source
# File lib/postjob/queue.rb, line 124
def unresolved_childjobs(parent)
  expect! parent => Job
  SQL.ask "SELECT COUNT(*) FROM #{SCHEMA_NAME}.unresolved_childjobs($1)", parent.id
end
version() click to toggle source

returns the version of the Postjob queue.

# File lib/postjob/queue/settings.rb, line 8
  def version
    return "0.3.*" unless settings?

    sql = <<~SQL
      SELECT 1 from pg_proc
      left join pg_namespace on pg_proc.pronamespace=pg_namespace.oid
      where pg_namespace.nspname='postjob' AND pg_proc.proname='settings_get'
    SQL

    if Simple::SQL.ask(sql)
      version = Simple::SQL.ask "SELECT postjob.settings_get('schema_version')"
      version ||= Simple::SQL.ask "SELECT postjob.settings_get('version')"
    else
      version = Simple::SQL.ask("SELECT value FROM postjob.settings WHERE name=$1", "version")
    end

    version || "unknown"
  end
worker_session_start(workflows_with_versions, host_id:, queues:) click to toggle source
# File lib/postjob/queue.rb, line 211
def worker_session_start(workflows_with_versions, host_id:, queues:)
  expect! host_id => UUID_REGEXP
  expect! queues => Array
  expect! queues.first => String

  Simple::SQL.ask "SELECT * FROM postjob.worker_session_start($1::uuid, $2, $3)", host_id, workflows_with_versions, queues, into: ::Postjob::WorkerSession
end
worker_session_stop(worker_session) click to toggle source

stop a worker session

# File lib/postjob/queue.rb, line 220
def worker_session_stop(worker_session)
  expect! worker_session => UUID_REGEXP

  Simple::SQL.ask "SELECT * FROM postjob.worker_session_stop($1::uuid)", worker_session
end

Private Instance Methods

parse_workflow(workflow) click to toggle source
# File lib/postjob/queue.rb, line 162
def parse_workflow(workflow)
  workflow, workflow_method = workflow.split(".", 2)
  workflow_method ||= "run"

  expect! workflow => /./
  expect! workflow_method => /^[_a-z][_a-z0-9]*$/

  [workflow, workflow_method]
end