module Postjob::Runner
Base implementations for Runners
This module contains methods for runners.
Constants
- Job
A job
- STATUSES
Public Class Methods
returns a subjob within the current job, for a runner
description and args
.
# File lib/postjob/runner.rb, line 39 def async(workflow, *args, timeout: nil, max_attempts: nil, queue: nil) worker_session_id = Postjob.current_session_id queue = current_job.queue if queue.nil? # if the workflow is a symbol, then we change it into "__manual__" # - there should never be a workflow with that name - or into # "CurrentWorkshop.#{workflow}", denoting the \a workflow method of the # current workflow. case workflow when :manual then workflow = "__manual__" when Symbol then workflow = "#{current_job.workflow}.#{workflow}" when Module then workflow = workflow.name when String then :nop else raise ArgumentError, "Unsupported workflow spec #{workflow.inspect}. Did you run await(fun(a, b)) instead of await(:fun, a, b)" end ::Postjob::Queue.find_or_create_childjob(worker_session_id, self.current_job, workflow, args, queue: queue, timeout: timeout, max_attempts: max_attempts) end
tries to resolve a job.
# File lib/postjob/runner.rb, line 64 def await(job, *args, timeout: nil, max_attempts: nil, queue: nil) case job when :all expect! args == [] expect! timeout => nil, max_attempts => nil, queue => nil unresolved_childjobs = Postjob::Queue.unresolved_childjobs(current_job) if unresolved_childjobs > 0 Postjob.logger.debug "await :all: Found #{unresolved_childjobs} unresolved childjobs" throw :pending, :pending else childjobs = Postjob::Queue.childjobs(current_job) childjobs.each do |childjob| r = childjob.resolve throw :pending, :pending if r == :pending end childjobs.count end when Job expect! args == [] expect! timeout => nil, max_attempts => nil, queue => nil r = job.resolve throw :pending, :pending if r == :pending r else job = async(job, *args, timeout: timeout, max_attempts: max_attempts, queue: queue) await(job) end end
returns the job that is currently running.
This value is set by process_job
(via with_current_job
), and currently only used from Postjob::Runner.async
# File lib/postjob/runner.rb, line 21 def current_job Thread.current[:current_job] end
runs a specific job
returns a tuple [status, value], which follows the following pattern:
-
[ <runner-version>, :ok, value, nil ]
: job completed successfully -
[ <runner-version>, :sleep, nil, nil ]
: job has to wait on a child job -
[ <runner-version>, :err, <err>, nil ]
: job errored with a recoverable error -
[ <runner-version>, :failed, <err>, <shutdown> ]
: job failed with a non-recoverable error
<err> is a tuple [ error-class-name, error-message, stacktrace ]. <shutdown> is either nil or :shutdown
# File lib/postjob/runner.rb, line 110 def process_job(job) expect! job => Job spec = Postjob::Registry.lookup!(name: job.workflow, version: job.workflow_version) expect! spec.runnable? with_current_job(job) do status, value, shutdown = invoke_workflow spec.workflow, job log_result! job, status, value # If the status is ok the job finished processing. In that case # we'll wait for all child jobs to finish. # if status == :ok # await :all # end [ spec.options.version, status, value, shutdown ] end end
Private Class Methods
# File lib/postjob/runner.rb, line 206 def error_message(job, status, value) runtime = Time.now.utc - job.created_at runtime = "%.03f secs" % runtime error_class, err_message, _error_backtrace = value "#{job} #{status} #{error_class} #{err_message.inspect}: #{runtime}" # + "\n backtrace information:\n #{error_backtrace.join("\n ")}" end
runs a job. Returns a [ status, value, shutdown ] tuple.
The shutdown value is used by a worker in run mode (i.e. process indefinetively) to determine whether or not it should cancel processing. It is usually nil; but if the worker received a SIGINT it will be :shutdown instead.
We are catching SIGINT to allow the job status to be updated.
# File lib/postjob/runner.rb, line 138 def invoke_workflow(workflow, job) value = catch(:pending) { expect! job.args => [Array, nil] workflow_method = job.workflow_method args = job.args insp_args = args.map(&:inspect).join(", ") logger.info "Running Postjob##{job.id}: #{job.workflow}.#{workflow_method}(#{insp_args})" workflow.public_send workflow_method, *args } if value == :pending [ :pending, nil, nil ] else # Check that we can encode the value. If we can't the job returned something invalid # i.e. something we cannot encode as JSON. This will raise a Postjob::Queue::Encoder::Error. # # Usually this points to a non-UTF8 string. # This is a fix for https://github.com/mediapeers/postjob/issues/35 Postjob::Queue::Encoder.check_encodable!([value]) [ :ok, value, nil ] end rescue ArgumentError, LocalJumpError, NameError, RegexpError, ScriptError, TypeError Postjob.logger.error "#{$!}, from\n\t#{$!.backtrace[0, 10].join("\n\t")}" return_exception :failed, $! rescue Postjob::Error::Nonrecoverable return_exception :failed, $! rescue PG::Error Postjob.logger.error "#{$!}, from\n\t#{$!.backtrace[0, 10].join("\n\t")}" STDERR.puts "#{$!}, from\n\t#{$!.backtrace[0, 10].join("\n\t")}" return_exception :failed, $! rescue Exception return_exception :err, $! end
# File lib/postjob/runner.rb, line 190 def log_result!(job, status, value) case status when :err severity = job.parent_id ? :warn : :error logger.send severity, error_message(job, status, value) when :failed logger.error error_message(job, status, value) when :ok runtime = Time.now.utc - job.created_at runtime = "%.03f secs" % runtime severity = job.parent_id ? :info : :warn msg = "#{job} successful w/result #{value.inspect}: #{runtime}" logger.send severity, msg end end
# File lib/postjob/runner.rb, line 179 def return_exception(state, exception) # get and shorten backtrace. error_backtrace = exception.backtrace[0, 10] curdir = "#{Dir.getwd}/" error_backtrace = error_backtrace.map { |path| path.start_with?(curdir) ? path[curdir.length..-1] : path } shutdown = should_shutdown?(exception) ? :shutdown : nil [ state, [exception.class.name, exception.message, error_backtrace], shutdown ] end
# File lib/postjob/runner.rb, line 175 def should_shutdown?(exception) exception.is_a?(Interrupt) end
# File lib/postjob/runner.rb, line 27 def with_current_job(job) expect! current_job => nil Thread.current[:current_job] = job yield ensure Thread.current[:current_job] = nil end