module Postjob::Queue
The Postjob::Queue
module manages enqueueing and fetching jobs from a job queue.
Constants
- SCHEMA_NAME
- SQL
- UUID_REGEXP
- WorkerSession
starts a session
Public Instance Methods
checkout(worker_session_id, queues:)
click to toggle source
# File lib/postjob/queue.rb, line 176 def checkout(worker_session_id, queues:) expect! worker_session_id => UUID_REGEXP expect! queues => [ nil, Array ] SQL.ask "SELECT * FROM #{SCHEMA_NAME}.checkout($1::uuid, $2::boolean, $3)", worker_session_id, Postjob.fast_mode, queues, into: Job end
childjobs(parent)
click to toggle source
# File lib/postjob/queue.rb, line 119 def childjobs(parent) expect! parent => Job SQL.all "SELECT * FROM #{SCHEMA_NAME}.childjobs($1)", parent.id, into: Job end
disable_cron_jobs(workflow, args)
click to toggle source
Disable cron'ness for workflow + args combination
# File lib/postjob/queue.rb, line 79 def disable_cron_jobs(workflow, args) sql = <<~SQL UPDATE postjob.postjobs SET cron_interval = NULL WHERE cron_interval IS NOT NULL AND workflow=$1 AND args=$2 AND parent_id IS NULL AND status NOT IN ('ok', 'failed', 'timeout') SQL Simple::SQL.ask sql, workflow, Encoder.encode(args) end
enqueue_job(worker_session_id, workflow, *args, options)
click to toggle source
enqueues a new job with the given arguments
Parameters:
-
queue - the name of the queue
-
workflow - the name of the workflow (e.g. “FooBar”, “FooBar#method_name”)
-
version - the version of the workflow, e.g. “0.2”
-
args - an array of arguments, must be encodable via Postjob::JSON.encode
-
parent_id - the id of the parent job, if any
-
tags - # a Hash[String => String]
# File lib/postjob/queue.rb, line 36 def enqueue_job(worker_session_id, workflow, *args, options) expect! workflow => String expect! options => { queue: [String, nil], version: [/\A\d(\.\d)+\z/, nil], parent_id: [Integer, nil], tags: [Hash, nil], timeout: [Numeric, nil], max_attempts: [Integer, nil], sticky: [true, false, nil], greedy: [true, false, nil] } workflow, workflow_method = parse_workflow(workflow) if options[:greedy] && !options[:sticky] raise ArgumentError, "#{workflow}: A greedy job must also be sticky" unless options[:sticky].nil? options[:sticky] = true if options[:greedy] end # The use of a `SELECT * FROM function()` here is due to # # a) a limitation in Simple::SQL which would not be able to unpack a # "SELECT function()" usefully when the return value is a record; # b) and/or my inability to write better SQL functions; SQL.ask "SELECT * FROM #{SCHEMA_NAME}.enqueue($1::uuid, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)", worker_session_id, options[:queue], workflow, workflow_method, options[:version], Encoder.encode(args), options[:parent_id], Encoder.encode(options[:tags]), options[:max_attempts], options[:timeout], options[:cron_interval], options[:sticky], options[:greedy], into: Job end
find_job_by_token(token)
click to toggle source
# File lib/postjob/queue.rb, line 188 def find_job_by_token(token) SQL.ask "SELECT * FROM #{SCHEMA_NAME}.postjobs_by_token($1)", token, into: Job end
find_or_create_childjob(worker_session_id, parent, workflow, args, timeout:, max_attempts:, queue: nil)
click to toggle source
# File lib/postjob/queue.rb, line 129 def find_or_create_childjob(worker_session_id, parent, workflow, args, timeout:, max_attempts:, queue: nil) expect! parent => Job expect! workflow => String expect! args => Array if workflow == "__manual__" && max_attempts != 1 Postjob.logger.info "Job ##{parent.id} adjusting max_attempts of '__manual__' child job" max_attempts = 1 end workflow, workflow_method = parse_workflow(workflow) # The use of a `SELECT * FROM function()` here is due to # # a) a limitation in Simple::SQL which would not be able to unpack a # "SELECT function()" usefully when the return value is a record; # b) and/or my inability to write better SQL functions; SQL.ask "SELECT * FROM #{SCHEMA_NAME}.find_or_create_childjob($1::uuid, $2, $3, $4, $5, $6, $7, $8, $9, $10)", worker_session_id, queue, workflow, workflow_method, nil, # version Encoder.encode(args), parent.id, nil, # tags will be read from parent max_attempts, timeout, into: Job end
find_or_create_token(job)
click to toggle source
# File lib/postjob/queue.rb, line 184 def find_or_create_token(job) SQL.ask "SELECT #{SCHEMA_NAME}.find_or_create_token($1)", job.id end
host_heartbeat(host_id, measurement)
click to toggle source
sends in a heartbeat
# File lib/postjob/queue.rb, line 203 def host_heartbeat(host_id, measurement) Simple::SQL.ask "SELECT postjob.host_heartbeat($1::uuid, $2::jsonb, $3)", host_id, JSON.generate(measurement), ::Postjob.fast_mode end
host_register(attributes, host_id:)
click to toggle source
returns the host id
# File lib/postjob/queue.rb, line 195 def host_register(attributes, host_id:) expect! attributes => [ nil, Hash ] expect! host_id => [ nil, UUID_REGEXP ] Simple::SQL.ask "SELECT postjob.host_register($1, $2::uuid)", JSON.generate(attributes), host_id end
set_job_error(worker_session_id, job, error, error_message, error_backtrace = nil, status:, version:)
click to toggle source
# File lib/postjob/queue.rb, line 111 def set_job_error(worker_session_id, job, error, error_message, error_backtrace = nil, status:, version:) expect! status => [ :failed, :err, :timeout ] expect! worker_session_id => UUID_REGEXP SQL.ask "SELECT #{SCHEMA_NAME}.set_job_error($1::uuid, $2, $3, $4, $5, $6, $7, $8)", worker_session_id, job.id, error, error_message, Encoder.encode(error_backtrace), status, version, Postjob.fast_mode end
set_job_pending(worker_session_id, job, version:)
click to toggle source
# File lib/postjob/queue.rb, line 105 def set_job_pending(worker_session_id, job, version:) expect! worker_session_id => UUID_REGEXP SQL.ask "SELECT #{SCHEMA_NAME}.set_job_pending($1::uuid, $2, $3)", worker_session_id, job.id, version end
set_job_result(worker_session_id, job, value, version:)
click to toggle source
# File lib/postjob/queue.rb, line 98 def set_job_result(worker_session_id, job, value, version:) expect! worker_session_id => UUID_REGEXP value = Encoder.encode([value]) unless value.nil? SQL.ask "SELECT #{SCHEMA_NAME}.set_job_result($1::uuid, $2, $3, $4)", worker_session_id, job.id, value, version end
settings?()
click to toggle source
# File lib/postjob/queue/settings.rb, line 2 def settings? tables = SQL::Reflection.tables(schema: "postjob") tables.include?("postjob.settings") end
should_shutdown?(worker_session_id)
click to toggle source
Asks the database whether this session should be shut down.
# File lib/postjob/queue.rb, line 94 def should_shutdown?(worker_session_id) SQL.ask "SELECT #{SCHEMA_NAME}.session_should_shutdown($1::uuid)", worker_session_id end
unresolved_childjobs(parent)
click to toggle source
# File lib/postjob/queue.rb, line 124 def unresolved_childjobs(parent) expect! parent => Job SQL.ask "SELECT COUNT(*) FROM #{SCHEMA_NAME}.unresolved_childjobs($1)", parent.id end
version()
click to toggle source
returns the version of the Postjob
queue.
# File lib/postjob/queue/settings.rb, line 8 def version return "0.3.*" unless settings? sql = <<~SQL SELECT 1 from pg_proc left join pg_namespace on pg_proc.pronamespace=pg_namespace.oid where pg_namespace.nspname='postjob' AND pg_proc.proname='settings_get' SQL if Simple::SQL.ask(sql) version = Simple::SQL.ask "SELECT postjob.settings_get('schema_version')" version ||= Simple::SQL.ask "SELECT postjob.settings_get('version')" else version = Simple::SQL.ask("SELECT value FROM postjob.settings WHERE name=$1", "version") end version || "unknown" end
worker_session_start(workflows_with_versions, host_id:, queues:)
click to toggle source
# File lib/postjob/queue.rb, line 211 def worker_session_start(workflows_with_versions, host_id:, queues:) expect! host_id => UUID_REGEXP expect! queues => Array expect! queues.first => String Simple::SQL.ask "SELECT * FROM postjob.worker_session_start($1::uuid, $2, $3)", host_id, workflows_with_versions, queues, into: ::Postjob::WorkerSession end
worker_session_stop(worker_session)
click to toggle source
stop a worker session
# File lib/postjob/queue.rb, line 220 def worker_session_stop(worker_session) expect! worker_session => UUID_REGEXP Simple::SQL.ask "SELECT * FROM postjob.worker_session_stop($1::uuid)", worker_session end
Private Instance Methods
parse_workflow(workflow)
click to toggle source
# File lib/postjob/queue.rb, line 162 def parse_workflow(workflow) workflow, workflow_method = workflow.split(".", 2) workflow_method ||= "run" expect! workflow => /./ expect! workflow_method => /^[_a-z][_a-z0-9]*$/ [workflow, workflow_method] end