class ChainedJob::Process

Attributes

args[R]
job_arguments_key[R]
job_instance[R]
job_tag[R]
worker_id[R]

Public Class Methods

new(args, job_instance, job_arguments_key, worker_id, job_tag) click to toggle source
# File lib/chained_job/process.rb, line 13
def initialize(args, job_instance, job_arguments_key, worker_id, job_tag)
  @args = args
  @job_instance = job_instance
  @job_arguments_key = job_arguments_key
  @worker_id = worker_id
  @job_tag = job_tag
end
run(args, job_instance, job_arguments_key, worker_id, job_tag) click to toggle source
# File lib/chained_job/process.rb, line 7
def self.run(args, job_instance, job_arguments_key, worker_id, job_tag)
  new(args, job_instance, job_arguments_key, worker_id, job_tag).run
end

Public Instance Methods

run() click to toggle source
# File lib/chained_job/process.rb, line 21
def run
  with_hooks do
    return finished_worker unless argument

    begin
      job_instance.process(argument)
    rescue StandardError => e
      push_job_arguments_back if handle_retry?
      raise e
    end
    job_instance.class.perform_later(args, worker_id, job_tag)
  end
end

Private Instance Methods

argument() click to toggle source
# File lib/chained_job/process.rb, line 61
def argument
  @argument ||= deserialized_argument
end
deserialized_argument() click to toggle source
# File lib/chained_job/process.rb, line 65
def deserialized_argument
  return unless serialized_argument

  Marshal.load(serialized_argument)
end
finished_worker() click to toggle source
# File lib/chained_job/process.rb, line 49
def finished_worker
  log_finished_worker

  ChainedJob.config.after_worker_finished&.call(options)
end
handle_retry?() click to toggle source
# File lib/chained_job/process.rb, line 37
def handle_retry?
  job_instance.try(:handle_retry?)
end
job_key() click to toggle source
# File lib/chained_job/process.rb, line 81
def job_key
  Helpers.job_key(job_arguments_key)
end
log_finished_worker() click to toggle source
# File lib/chained_job/process.rb, line 55
def log_finished_worker
  ChainedJob.logger.info(
    "#{job_instance.class}:#{job_tag} worker #{worker_id} finished"
  )
end
options() click to toggle source
# File lib/chained_job/process.rb, line 45
def options
  @options ||= { job_class: job_instance.class, worker_id: worker_id, args: args }
end
push_job_arguments_back() click to toggle source
# File lib/chained_job/process.rb, line 85
def push_job_arguments_back
  ChainedJob.redis.rpush(redis_key, Helpers.serialize([argument]))
end
redis_key() click to toggle source
# File lib/chained_job/process.rb, line 77
def redis_key
  Helpers.redis_key(job_key, job_tag)
end
serialized_argument() click to toggle source
# File lib/chained_job/process.rb, line 71
def serialized_argument
  return @serialized_argument if defined?(@serialized_argument)

  @serialized_argument = ChainedJob.redis.lpop(redis_key)
end
with_hooks() { || ... } click to toggle source
# File lib/chained_job/process.rb, line 41
def with_hooks
  ChainedJob.config.around_chain_process.call(options) { yield }
end