class Resque::Plugins::Stages::StagedJob
rubocop:disable Metrics/ClassLength
Attributes
class_name[W]
job_id[R]
Public Class Methods
create_job(staged_group_stage, klass, *args)
click to toggle source
Creates a job to be queued to Resque
that has an ID that we can track its status with.
# File lib/resque/plugins/stages/staged_job.rb, line 26 def create_job(staged_group_stage, klass, *args) job = Resque::Plugins::Stages::StagedJob.new(SecureRandom.uuid) job.staged_group_stage = staged_group_stage job.class_name = klass.name job.args = args job.save! job end
new(job_id)
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 39 def initialize(job_id) @job_id = job_id end
Public Instance Methods
<=>(other)
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 43 def <=>(other) return nil unless other.is_a?(Resque::Plugins::Stages::StagedJob) job_id <=> other.job_id end
args()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 144 def args @args = if defined?(@args) @args else decompress_args(Array.wrap(decode_args(stored_values[:args]))) end end
args=(value)
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 152 def args=(value) @args = value.nil? ? [] : Array.wrap(value).dup end
blank?()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 168 def blank? !redis.exists(job_key) end
class_name()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 49 def class_name @class_name ||= stored_values[:class_name] end
completed?()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 156 def completed? %i[failed successful].include? status end
compressed?()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 179 def compressed? compressable? && described_class.compressed?(args) end
delete()
click to toggle source
rubocop:enable Metrics/AbcSize
# File lib/resque/plugins/stages/staged_job.rb, line 104 def delete # Make sure the job is loaded into memory so we can use it even though we are going to delete it. stored_values redis.del(job_key) staged_group_stage.remove_job(self) end
enqueue_args()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 128 def enqueue_args [klass, *enqueue_compressed_args] end
enqueue_compressed_args()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 132 def enqueue_compressed_args new_args = compressed_args([{ staged_job_id: job_id }, *args]) new_args[0][:staged_job_id] = job_id new_args end
enqueue_job()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 113 def enqueue_job case status when :pending self.status = :queued Resque.enqueue(*enqueue_args) when :pending_re_run Resque.enqueue_delayed_selection do |args| # :nocov: klass.perform_job(*Array.wrap(args)).job_id == job_id # :nocov: end end end
pending?()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 164 def pending? %i[pending pending_re_run].include? status end
queue_time()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 53 def queue_time @queue_time ||= stored_values[:queue_time].to_time end
queued?()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 160 def queued? %i[queued running pending_re_run].include? status end
save!()
click to toggle source
rubocop:disable Metrics/AbcSize
# File lib/resque/plugins/stages/staged_job.rb, line 93 def save! redis.hsetnx(job_key, "queue_time", Time.now) redis.hset(job_key, "class_name", class_name) redis.hset(job_key, "args", encode_args(*compressed_args(args))) redis.hset(job_key, "staged_group_stage_id", staged_group_stage_id) redis.hset(job_key, "status", status) redis.hset(job_key, "status_message", status_message) end
staged_group_stage()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 77 def staged_group_stage return nil if staged_group_stage_id.blank? @staged_group_stage ||= Resque::Plugins::Stages::StagedGroupStage.new(staged_group_stage_id) end
staged_group_stage=(value)
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 83 def staged_group_stage=(value) @staged_group_stage = value @staged_group_stage_id = value.group_stage_id redis.hset(job_key, "staged_group_stage_id", staged_group_stage_id) value.add_job(self) end
status()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 57 def status @status ||= stored_values[:status]&.to_sym || :pending end
status=(value)
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 61 def status=(value) @status = value redis.hset(job_key, "status", status) notify_stage end
status_message()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 68 def status_message @status_message ||= stored_values[:status_message] end
status_message=(value)
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 72 def status_message=(value) @status_message = value redis.hset(job_key, "status_message", status_message) end
uncompressed_args()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 140 def uncompressed_args decompress_args(args) end
verify()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 172 def verify return build_new_structure if staged_group_stage.blank? staged_group_stage.verify staged_group_stage.verify_job(self) end
Private Instance Methods
build_new_structure()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 185 def build_new_structure group = Resque::Plugins::Stages::StagedGroup.new(SecureRandom.uuid) stage = group.current_stage self.staged_group_stage = stage end
compressable?()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 252 def compressable? !described_class.blank? && described_class.singleton_class.included_modules.map(&:name).include?("Resque::Plugins::Compressible") end
compressed_args(compress_args)
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 257 def compressed_args(compress_args) return compress_args unless compressable? return compress_args if described_class.compressed?(compress_args) [{ resque_compressed: true, payload: described_class.compressed_args(compress_args) }] end
decode_args(args_string)
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 204 def decode_args(args_string) return if args_string.blank? Resque.decode(args_string) end
decompress_args(basic_args)
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 264 def decompress_args(basic_args) return basic_args unless compressable? return basic_args unless described_class.compressed?(basic_args) described_class.uncompressed_args(basic_args.first[:payload] || basic_args.first["payload"]) end
described_class()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 242 def described_class return if class_name.blank? class_name.constantize rescue StandardError # :nocov: nil # :nocov: end
encode_args(*args)
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 200 def encode_args(*args) Resque.encode(args) end
job_key()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 210 def job_key "StagedJob::#{job_id}" end
klass()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 196 def klass @klass ||= class_name.constantize end
mark_stage_pending()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 230 def mark_stage_pending return if %i[running pending].include? staged_group_stage.status staged_group_stage.status = :pending end
mark_stage_running()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 236 def mark_stage_running return if staged_group_stage.status == :running staged_group_stage.status = :running end
notify_stage()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 218 def notify_stage return if staged_group_stage.blank? if status == :pending mark_stage_pending elsif queued? mark_stage_running else staged_group_stage.job_completed end end
staged_group_stage_id()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 214 def staged_group_stage_id @staged_group_stage_id ||= stored_values[:staged_group_stage_id] end
stored_values()
click to toggle source
# File lib/resque/plugins/stages/staged_job.rb, line 192 def stored_values @stored_values ||= (redis.hgetall(job_key) || {}).with_indifferent_access end