module Postjob::Runner

Base implementations for Runners

This module contains methods for runners.

Constants

Job

A job

STATUSES

Public Class Methods

async(workflow, *args, timeout: nil, max_attempts: nil, queue: nil) click to toggle source

returns a subjob within the current job, for a runner description and args.

# File lib/postjob/runner.rb, line 39
def async(workflow, *args, timeout: nil, max_attempts: nil, queue: nil)
  worker_session_id = Postjob.current_session_id

  queue = current_job.queue if queue.nil?

  # if the workflow is a symbol, then we change it into "__manual__"
  # - there should never be a workflow with that name - or into
  # "CurrentWorkshop.#{workflow}", denoting the \a workflow method of the
  # current workflow.
  case workflow
  when :manual  then workflow = "__manual__"
  when Symbol   then workflow = "#{current_job.workflow}.#{workflow}"
  when Module   then workflow = workflow.name
  when String   then :nop
  else
    raise ArgumentError, "Unsupported workflow spec #{workflow.inspect}. Did you run await(fun(a, b)) instead of await(:fun, a, b)"
  end

  ::Postjob::Queue.find_or_create_childjob(worker_session_id, self.current_job, workflow, args,
                                           queue: queue,
                                           timeout: timeout,
                                           max_attempts: max_attempts)
end
await(job, *args, timeout: nil, max_attempts: nil, queue: nil) click to toggle source

tries to resolve a job.

# File lib/postjob/runner.rb, line 64
def await(job, *args, timeout: nil, max_attempts: nil, queue: nil)
  case job
  when :all
    expect! args == []
    expect! timeout => nil, max_attempts => nil, queue => nil

    unresolved_childjobs = Postjob::Queue.unresolved_childjobs(current_job)
    if unresolved_childjobs > 0
      Postjob.logger.debug "await :all: Found #{unresolved_childjobs} unresolved childjobs"
      throw :pending, :pending
    else
      childjobs = Postjob::Queue.childjobs(current_job)
      childjobs.each do |childjob|
        r = childjob.resolve
        throw :pending, :pending if r == :pending
      end
      childjobs.count
    end
  when Job
    expect! args == []
    expect! timeout => nil, max_attempts => nil, queue => nil

    r = job.resolve
    throw :pending, :pending if r == :pending
    r
  else
    job = async(job, *args, timeout: timeout, max_attempts: max_attempts, queue: queue)
    await(job)
  end
end
current_job() click to toggle source

returns the job that is currently running.

This value is set by process_job (via with_current_job), and currently only used from Postjob::Runner.async

# File lib/postjob/runner.rb, line 21
def current_job
  Thread.current[:current_job]
end
process_job(job) click to toggle source

runs a specific job

returns a tuple [status, value], which follows the following pattern:

  • [ <runner-version>, :ok, value, nil ]: job completed successfully

  • [ <runner-version>, :sleep, nil, nil ]: job has to wait on a child job

  • [ <runner-version>, :err, <err>, nil ]: job errored with a recoverable error

  • [ <runner-version>, :failed, <err>, <shutdown> ]: job failed with a non-recoverable error

<err> is a tuple [ error-class-name, error-message, stacktrace ]. <shutdown> is either nil or :shutdown

# File lib/postjob/runner.rb, line 110
def process_job(job)
  expect! job => Job

  spec = Postjob::Registry.lookup!(name: job.workflow, version: job.workflow_version)
  expect! spec.runnable?

  with_current_job(job) do
    status, value, shutdown = invoke_workflow spec.workflow, job
    log_result! job, status, value
    # If the status is ok the job finished processing. In that case
    # we'll wait for all child jobs to finish.
    # if status == :ok
    #   await :all
    # end
    [ spec.options.version, status, value, shutdown ]
  end
end

Private Class Methods

error_message(job, status, value) click to toggle source
# File lib/postjob/runner.rb, line 206
def error_message(job, status, value)
  runtime = Time.now.utc - job.created_at
  runtime = "%.03f secs" % runtime
  error_class, err_message, _error_backtrace = value

  "#{job} #{status} #{error_class} #{err_message.inspect}: #{runtime}"
  # + "\n    backtrace information:\n    #{error_backtrace.join("\n    ")}"
end
invoke_workflow(workflow, job) click to toggle source

runs a job. Returns a [ status, value, shutdown ] tuple.

The shutdown value is used by a worker in run mode (i.e. process indefinetively) to determine whether or not it should cancel processing. It is usually nil; but if the worker received a SIGINT it will be :shutdown instead.

We are catching SIGINT to allow the job status to be updated.

# File lib/postjob/runner.rb, line 138
def invoke_workflow(workflow, job)
  value = catch(:pending) {
    expect! job.args => [Array, nil]

    workflow_method = job.workflow_method
    args = job.args

    insp_args = args.map(&:inspect).join(", ")
    logger.info "Running Postjob##{job.id}: #{job.workflow}.#{workflow_method}(#{insp_args})"

    workflow.public_send workflow_method, *args
  }

  if value == :pending
    [ :pending, nil, nil ]
  else
    # Check that we can encode the value. If we can't the job returned something invalid
    # i.e. something we cannot encode as JSON. This will raise a Postjob::Queue::Encoder::Error.
    #
    # Usually this points to a non-UTF8 string.
    # This is a fix for https://github.com/mediapeers/postjob/issues/35
    Postjob::Queue::Encoder.check_encodable!([value])
    [ :ok, value, nil ]
  end
rescue ArgumentError, LocalJumpError, NameError, RegexpError, ScriptError, TypeError
  Postjob.logger.error "#{$!}, from\n\t#{$!.backtrace[0, 10].join("\n\t")}"
  return_exception :failed, $!
rescue Postjob::Error::Nonrecoverable
  return_exception :failed, $!
rescue PG::Error
  Postjob.logger.error "#{$!}, from\n\t#{$!.backtrace[0, 10].join("\n\t")}"
  STDERR.puts "#{$!}, from\n\t#{$!.backtrace[0, 10].join("\n\t")}"
  return_exception :failed, $!
rescue Exception
  return_exception :err, $!
end
log_result!(job, status, value) click to toggle source
# File lib/postjob/runner.rb, line 190
def log_result!(job, status, value)
  case status
  when :err
    severity = job.parent_id ? :warn : :error
    logger.send severity, error_message(job, status, value)
  when :failed
    logger.error error_message(job, status, value)
  when :ok
    runtime = Time.now.utc - job.created_at
    runtime = "%.03f secs" % runtime
    severity = job.parent_id ? :info : :warn
    msg = "#{job} successful w/result #{value.inspect}: #{runtime}"
    logger.send severity, msg
  end
end
return_exception(state, exception) click to toggle source
# File lib/postjob/runner.rb, line 179
def return_exception(state, exception)
  # get and shorten backtrace.
  error_backtrace = exception.backtrace[0, 10]

  curdir = "#{Dir.getwd}/"
  error_backtrace = error_backtrace.map { |path| path.start_with?(curdir) ? path[curdir.length..-1] : path }

  shutdown = should_shutdown?(exception) ? :shutdown : nil
  [ state, [exception.class.name, exception.message, error_backtrace], shutdown ]
end
should_shutdown?(exception) click to toggle source
# File lib/postjob/runner.rb, line 175
def should_shutdown?(exception)
  exception.is_a?(Interrupt)
end
with_current_job(job) { || ... } click to toggle source
# File lib/postjob/runner.rb, line 27
def with_current_job(job)
  expect! current_job => nil
  Thread.current[:current_job] = job
  yield
ensure
  Thread.current[:current_job] = nil
end