class Resque::Plugins::Stages::StagedJob

rubocop:disable Metrics/ClassLength

Attributes

class_name[W]
job_id[R]

Public Class Methods

create_job(staged_group_stage, klass, *args) click to toggle source

Creates a job to be queued to Resque that has an ID that we can track its status with.

# File lib/resque/plugins/stages/staged_job.rb, line 26
def create_job(staged_group_stage, klass, *args)
  job = Resque::Plugins::Stages::StagedJob.new(SecureRandom.uuid)

  job.staged_group_stage = staged_group_stage
  job.class_name         = klass.name
  job.args               = args

  job.save!

  job
end
new(job_id) click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 39
def initialize(job_id)
  @job_id = job_id
end

Public Instance Methods

<=>(other) click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 43
def <=>(other)
  return nil unless other.is_a?(Resque::Plugins::Stages::StagedJob)

  job_id <=> other.job_id
end
args() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 144
def args
  @args = if defined?(@args)
            @args
          else
            decompress_args(Array.wrap(decode_args(stored_values[:args])))
          end
end
args=(value) click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 152
def args=(value)
  @args = value.nil? ? [] : Array.wrap(value).dup
end
blank?() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 168
def blank?
  !redis.exists(job_key)
end
class_name() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 49
def class_name
  @class_name ||= stored_values[:class_name]
end
completed?() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 156
def completed?
  %i[failed successful].include? status
end
compressed?() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 179
def compressed?
  compressable? && described_class.compressed?(args)
end
delete() click to toggle source

rubocop:enable Metrics/AbcSize

# File lib/resque/plugins/stages/staged_job.rb, line 104
def delete
  # Make sure the job is loaded into memory so we can use it even though we are going to delete it.
  stored_values

  redis.del(job_key)

  staged_group_stage.remove_job(self)
end
enqueue_args() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 128
def enqueue_args
  [klass, *enqueue_compressed_args]
end
enqueue_compressed_args() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 132
def enqueue_compressed_args
  new_args = compressed_args([{ staged_job_id: job_id }, *args])

  new_args[0][:staged_job_id] = job_id

  new_args
end
enqueue_job() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 113
def enqueue_job
  case status
    when :pending
      self.status = :queued
      Resque.enqueue(*enqueue_args)

    when :pending_re_run
      Resque.enqueue_delayed_selection do |args|
        # :nocov:
        klass.perform_job(*Array.wrap(args)).job_id == job_id
        # :nocov:
      end
  end
end
pending?() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 164
def pending?
  %i[pending pending_re_run].include? status
end
queue_time() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 53
def queue_time
  @queue_time ||= stored_values[:queue_time].to_time
end
queued?() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 160
def queued?
  %i[queued running pending_re_run].include? status
end
save!() click to toggle source

rubocop:disable Metrics/AbcSize

# File lib/resque/plugins/stages/staged_job.rb, line 93
def save!
  redis.hsetnx(job_key, "queue_time", Time.now)
  redis.hset(job_key, "class_name", class_name)
  redis.hset(job_key, "args", encode_args(*compressed_args(args)))
  redis.hset(job_key, "staged_group_stage_id", staged_group_stage_id)
  redis.hset(job_key, "status", status)
  redis.hset(job_key, "status_message", status_message)
end
staged_group_stage() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 77
def staged_group_stage
  return nil if staged_group_stage_id.blank?

  @staged_group_stage ||= Resque::Plugins::Stages::StagedGroupStage.new(staged_group_stage_id)
end
staged_group_stage=(value) click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 83
def staged_group_stage=(value)
  @staged_group_stage    = value
  @staged_group_stage_id = value.group_stage_id

  redis.hset(job_key, "staged_group_stage_id", staged_group_stage_id)

  value.add_job(self)
end
status() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 57
def status
  @status ||= stored_values[:status]&.to_sym || :pending
end
status=(value) click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 61
def status=(value)
  @status = value
  redis.hset(job_key, "status", status)

  notify_stage
end
status_message() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 68
def status_message
  @status_message ||= stored_values[:status_message]
end
status_message=(value) click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 72
def status_message=(value)
  @status_message = value
  redis.hset(job_key, "status_message", status_message)
end
uncompressed_args() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 140
def uncompressed_args
  decompress_args(args)
end
verify() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 172
def verify
  return build_new_structure if staged_group_stage.blank?

  staged_group_stage.verify
  staged_group_stage.verify_job(self)
end

Private Instance Methods

build_new_structure() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 185
def build_new_structure
  group = Resque::Plugins::Stages::StagedGroup.new(SecureRandom.uuid)
  stage = group.current_stage

  self.staged_group_stage = stage
end
compressable?() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 252
def compressable?
  !described_class.blank? &&
      described_class.singleton_class.included_modules.map(&:name).include?("Resque::Plugins::Compressible")
end
compressed_args(compress_args) click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 257
def compressed_args(compress_args)
  return compress_args unless compressable?
  return compress_args if described_class.compressed?(compress_args)

  [{ resque_compressed: true, payload: described_class.compressed_args(compress_args) }]
end
decode_args(args_string) click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 204
def decode_args(args_string)
  return if args_string.blank?

  Resque.decode(args_string)
end
decompress_args(basic_args) click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 264
def decompress_args(basic_args)
  return basic_args unless compressable?
  return basic_args unless described_class.compressed?(basic_args)

  described_class.uncompressed_args(basic_args.first[:payload] || basic_args.first["payload"])
end
described_class() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 242
def described_class
  return if class_name.blank?

  class_name.constantize
rescue StandardError
  # :nocov:
  nil
  # :nocov:
end
encode_args(*args) click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 200
def encode_args(*args)
  Resque.encode(args)
end
job_key() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 210
def job_key
  "StagedJob::#{job_id}"
end
klass() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 196
def klass
  @klass ||= class_name.constantize
end
mark_stage_pending() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 230
def mark_stage_pending
  return if %i[running pending].include? staged_group_stage.status

  staged_group_stage.status = :pending
end
mark_stage_running() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 236
def mark_stage_running
  return if staged_group_stage.status == :running

  staged_group_stage.status = :running
end
notify_stage() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 218
def notify_stage
  return if staged_group_stage.blank?

  if status == :pending
    mark_stage_pending
  elsif queued?
    mark_stage_running
  else
    staged_group_stage.job_completed
  end
end
staged_group_stage_id() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 214
def staged_group_stage_id
  @staged_group_stage_id ||= stored_values[:staged_group_stage_id]
end
stored_values() click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 192
def stored_values
  @stored_values ||= (redis.hgetall(job_key) || {}).with_indifferent_access
end