module Postjob

rubocop:disable Security/Eval

Constants

CRON_INTERVAL_MIN
DEFAULT_QUEUE
VERSION

Attributes

logger[RW]

Public Instance Methods

current_session?() click to toggle source
# File lib/postjob.rb, line 263
def current_session?
  @worker_session != @nil
end
current_session_id() click to toggle source

This method connects to the queue. This means it registers as a new worker_session, if there was no worker_session yet.

# File lib/postjob.rb, line 269
def current_session_id
  raise("worker_session hasn't been started yet.") unless @worker_session

  @worker_session.id
end
enqueue!(workflow, *args, queue: nil, parent_id: nil, max_attempts: nil, timeout: nil, version: nil, tags: nil, cron_interval: nil, sticky: nil) click to toggle source

Enqueues a workflow.

Options include

  • version

  • max_attempts

  • timeout

  • sticky

  • cron_interval

  • queue

Returns a job id

# File lib/postjob.rb, line 62
def enqueue!(workflow, *args, queue: nil,
             parent_id: nil,
             max_attempts: nil,
             timeout: nil,
             version: nil,
             tags: nil,
             cron_interval: nil,
             sticky: nil)
  expect! queue => [nil, String]
  expect! workflow => String
  expect! parent_id => [nil, Integer]
  expect! max_attempts => [nil, Integer]
  expect! timeout => [nil, Numeric]
  expect! tags => [nil, Hash]
  expect! sticky => [nil, false, true]

  # -- prepare arguments ----------------------------------------------------

  if workflow == "__manual__" && max_attempts != 1
    if parent_id
      Postjob.logger.info "Job ##{parent_id} adjusting max_attempts of '__manual__' child job"
    else
      Postjob.logger.info "Adjusting max_attempts of '__manual__' root job"
    end

    max_attempts = 1
  end

  # -- fetch defaults from registry -----------------------------------------

  spec = Postjob::Registry[workflow]
  if spec
    max_attempts    = spec.options.max_attempts if max_attempts.nil?
    timeout         = spec.options.timeout if timeout.nil?
    sticky          = spec.options.sticky if sticky.nil?
    cron_interval   = spec.options.cron_interval if cron_interval.nil?
    queue           = spec.options.queue if queue.nil?
    greedy          = spec.options.greedy
  end

  if cron_interval && cron_interval < CRON_INTERVAL_MIN
    raise "cron interval must be at least #{CRON_INTERVAL_MIN} seconds"
  end

  # -- disable existing cron jobs -------------------------------------------

  if cron_interval
    Queue.disable_cron_jobs(workflow, args)
  end

  # -- enqueue jobs ---------------------------------------------------------

  tags = stringify_hash(tags) if tags

  session_id = current_session_id if current_session?
  session_id ||= "00000000-0000-0000-0000-000000000000"

  job = Queue.enqueue_job session_id, workflow, *args, queue: queue,
                                                       parent_id: parent_id,
                                                       max_attempts: max_attempts,
                                                       timeout: timeout,
                                                       tags: tags,
                                                       version: version,
                                                       cron_interval: cron_interval,
                                                       sticky: sticky,
                                                       greedy: greedy

  logger.info "Generated process #{job}"
  job.id
end
env() click to toggle source
# File lib/postjob.rb, line 31
def env
  ENV["POSTJOB_ENV"] || ENV["RAILS_ENV"] || ENV["RACK_ENV"] || "development"
end
host_id() click to toggle source
# File lib/postjob.rb, line 35
def host_id
  Host.host_id
end
process_all() click to toggle source

process all waiting jobs.

This method starts processing jobs, as long as there are some. It returns once no runnable jobs can be found anymore.

Note that this method is not limited to the set of runnable jobs present when calling it; if running a job results in newly created runnable jobs these jobs will be processed as well.

This method returns the number of processed jobs.

# File lib/postjob.rb, line 154
def process_all
  run do |job|
    !job.nil?
  end
end
register_workflow(workflow, options = {}) click to toggle source

Registers a workflow.

This call registers a workflow with a set of options.

This is usually used via

module X
  Postjob.register self, max_attempts: 2, timeout: 86400
end

The workflow parameter is either a module which implements a workflow, or a String with the name of a workflow, and a set of options.

Options include

  • version

  • max_attempts

  • timeout

  • sticky

  • cron_interval

  • queue

# File lib/postjob.rb, line 307
def register_workflow(workflow, options = {})
  expect! workflow => [ Module, String ]

  workflow.extend Postjob::Workflow if workflow.is_a?(Module)
  Registry.register workflow, options
end
resolve(token:, result:) click to toggle source

Explicitely resolve a workflow.

# File lib/postjob.rb, line 276
def resolve(token:, result:)
  job = Queue.find_job_by_token(token)
  raise "No job with token #{token}" unless job

  Queue.set_job_result current_session_id, job, result, version: nil
end
run(count: nil, queues: nil, heartbeat: true) { |processed_job_id| ... } click to toggle source

processes many jobs.

This method starts processing jobs, as long as there are some. If no jobs can be found this method waits until a job becomes available.

After processing each job is yielded into the passed in block.

This method continues until: a) the requested number of jobs (via the count: argument) was processed (note:

repeated job executions due to rerunning jobs that slept or errored count
multiple times), or

b) the block yielded into returns false.

This method returns the number of processed jobs.

# File lib/postjob.rb, line 174
def run(count: nil, queues: nil, heartbeat: true, &block)
  queues ||= Postjob::Registry.queues

  # to run 10^12 jobs that would take 1 msecs each we would need, at least,
  # 760 years - so this default should be fine. Also, someone should update
  # the machine in the meantime :)
  count ||= 1_000_000_000_000

  with_worker_session heartbeat: heartbeat, queues: queues do
    processed_jobs_count = 0

    loop do
      processed_job_id, shutdown = Postjob.step(queues: queues)
      processed_jobs_count += 1 if processed_job_id

      break if processed_jobs_count >= count
      break if block && yield(processed_job_id) == false
      break if shutdown == :shutdown

      next if processed_job_id
      shutdown = Queue::Notifications.wait_for_new_job(current_session_id, queues: queues)
      break if shutdown == :shutdown
    end

    processed_jobs_count
  end
end
step(queues: nil) click to toggle source

Runs a single job

This method tries to check out a runnable job. If it finds one the job is processed (via Postjob.process_job).

This method returns a tuple [ <job>, <shutdown> ], where

  • <job-id> is the id of the job which has been processed;

  • <shutdown> is a flag, either :shutdown or nil. :shutdown notifies self.run to terminate the run loop.

or nil, when no job could be checked out.

# File lib/postjob.rb, line 254
def step(queues: nil)
  expect! queues => [Array, nil]
  queues ||= Postjob::Registry.queues

  job = Postjob::Queue.checkout(current_session_id, queues: queues)

  [ job.id, process_job(job) ] if job
end
stop_worker_session!() click to toggle source
# File lib/postjob.rb, line 235
def stop_worker_session!
  return unless @worker_session

  WorkerSession.stop!(@worker_session)
  @worker_session = nil
end

Private Instance Methods

enqueue_cleanup(job) click to toggle source
# File lib/postjob.rb, line 346
def enqueue_cleanup(job)
  spec = Postjob::Registry.lookup!(name: job.workflow, version: job.workflow_version)
  return unless spec.supports_cleanup?

  Postjob.logger.info "Enqueuing cleanup job #{spec.name}.cleanup(#{job.args.map(&:inspect).join(", ")})"

  Simple::SQL.transaction do
    job_id = enqueue! "#{job.workflow}.cleanup", *job.args, queue: job.queue,
                                                            parent_id: nil,
                                                            max_attempts: nil,
                                                            timeout: nil,
                                                            version: job.workflow_version,
                                                            tags: job.tags,
                                                            cron_interval: nil,
                                                            sticky: job.is_sticky

    if job.is_sticky
      Simple::SQL.ask "UPDATE postjob.postjobs SET sticky_host_id=$1 WHERE id=$2", job.sticky_host_id, job_id
    end
  end
end
stringify_hash(hsh) click to toggle source
# File lib/postjob.rb, line 135
def stringify_hash(hsh)
  hsh.inject({}) do |r, (k, v)|
    k = k.to_s if k.is_a?(Symbol)
    r.update k => v
  end
end