class Resque::Plugins::Stages::StagedGroupStage
rubocop:disable Metrics/ClassLength
Attributes
group_stage_id[R]
Public Class Methods
new(group_stage_id)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 20 def initialize(group_stage_id) @group_stage_id = group_stage_id end
Public Instance Methods
<=>(other)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 24 def <=>(other) return nil unless other.is_a?(Resque::Plugins::Stages::StagedGroupStage) group_stage_id <=> other.group_stage_id end
add_job(staged_group_job)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 177 def add_job(staged_group_job) redis.rpush stage_key, staged_group_job.job_id end
blank?()
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 211 def blank? !redis.exists(stage_key) && !redis.exists(staged_group_key) end
delete()
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 185 def delete jobs.each(&:delete) staged_group&.remove_stage self redis.del stage_key redis.del staged_group_key end
enqueue(klass, *args)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 30 def enqueue(klass, *args) job = create_enqueue_job(klass, args) return job if status == :pending self.status = :running if status != :running job.status = :queued Resque.enqueue(*job.enqueue_args) job end
enqueue_at(timestamp, klass, *args)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 56 def enqueue_at(timestamp, klass, *args) job = create_enqueue_job(klass, args) return job if status == :pending self.status = :running if status != :running job.status = :queued Resque.enqueue_at(timestamp, *job.enqueue_args) job end
enqueue_at_with_queue(queue, timestamp, klass, *args)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 69 def enqueue_at_with_queue(queue, timestamp, klass, *args) job = create_enqueue_job(klass, args) return job if status == :pending self.status = :running if status != :running job.status = :queued Resque.enqueue_at_with_queue(queue, timestamp, *job.enqueue_args) job end
enqueue_in(number_of_seconds_from_now, klass, *args)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 82 def enqueue_in(number_of_seconds_from_now, klass, *args) job = create_enqueue_job(klass, args) return job if status == :pending self.status = :running if status != :running job.status = :queued Resque.enqueue_in(number_of_seconds_from_now, *job.enqueue_args) job end
enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 95 def enqueue_in_with_queue(queue, number_of_seconds_from_now, klass, *args) job = create_enqueue_job(klass, args) return job if status == :pending self.status = :running if status != :running job.status = :queued Resque.enqueue_in_with_queue(queue, number_of_seconds_from_now, *job.enqueue_args) job end
enqueue_to(queue, klass, *args)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 43 def enqueue_to(queue, klass, *args) job = create_enqueue_job(klass, args) return job if status == :pending self.status = :running if status != :running job.status = :queued Resque.enqueue_to(queue, *job.enqueue_args) job end
initiate()
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 194 def initiate self.status = :running jobs.each do |job| next if job.completed? next if job.queued? job.enqueue_job end job_completed end
job_completed()
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 207 def job_completed self.status = :complete if jobs.all?(&:completed?) end
jobs(start = 0, stop = -1)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 140 def jobs(start = 0, stop = -1) redis.lrange(stage_key, start, stop).map { |id| Resque::Plugins::Stages::StagedJob.new(id) } end
jobs_by_status(status)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 144 def jobs_by_status(status) jobs.select { |job| job.status == status } end
num_jobs()
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 173 def num_jobs redis.llen(stage_key) end
number()
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 118 def number redis.hget(staged_group_key, "number")&.to_i || 1 end
number=(value)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 122 def number=(value) redis.hset(staged_group_key, "number", value) end
order_param(sort_option, current_sort, current_order)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 163 def order_param(sort_option, current_sort, current_order) current_order ||= "asc" if sort_option == current_sort current_order == "asc" ? "desc" : "asc" else "asc" end end
paginated_jobs(sort_key = :class_name, sort_order = "asc", page_num = 1, queue_page_size = 20)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 148 def paginated_jobs(sort_key = :class_name, sort_order = "asc", page_num = 1, queue_page_size = 20) queue_page_size = queue_page_size.to_i queue_page_size = 20 if queue_page_size < 1 job_list = sorted_jobs(sort_key) page_start = (page_num - 1) * queue_page_size page_start = 0 if page_start > job_list.length || page_start.negative? (sort_order == "desc" ? job_list.reverse : job_list)[page_start..(page_start + queue_page_size - 1)] end
remove_job(staged_group_job)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 181 def remove_job(staged_group_job) redis.lrem(stage_key, 0, staged_group_job.job_id) end
staged_group()
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 126 def staged_group return nil if staged_group_id.blank? @staged_group ||= Resque::Plugins::Stages::StagedGroup.new(staged_group_id) end
staged_group=(value)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 132 def staged_group=(value) @staged_group = value @staged_group_id = value.group_id value.add_stage(self) redis.hset(staged_group_key, "staged_group_id", value.group_id) end
status()
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 108 def status redis.hget(staged_group_key, "status")&.to_sym || :pending end
status=(value)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 112 def status=(value) redis.hset(staged_group_key, "status", value.to_s) staged_group&.stage_completed if status == :complete end
verify()
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 215 def verify return build_new_structure if staged_group.blank? staged_group.verify_stage(self) end
verify_job(job)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 221 def verify_job(job) ids = redis.lrange(stage_key, 0, -1) return if ids.include?(job.job_id) redis.lpush(stage_key, job.job_id) end
Private Instance Methods
build_new_structure()
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 231 def build_new_structure group = Resque::Plugins::Stages::StagedGroup.new(SecureRandom.uuid) self.staged_group = group end
create_enqueue_job(klass, args)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 237 def create_enqueue_job(klass, args) Resque::Plugins::Stages::StagedJob.create_job self, klass, *args end
job_sort_value(job, sort_key)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 259 def job_sort_value(job, sort_key) case sort_key.to_sym when :class_name, :status, :status_message job.public_send(sort_key) when :queue_time job.public_send(sort_key).to_s end end
sorted_jobs(sort_key)
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 253 def sorted_jobs(sort_key) jobs.sort_by do |job| job_sort_value(job, sort_key) end end
stage_key()
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 241 def stage_key "StagedGroupStage::#{group_stage_id}" end
staged_group_id()
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 249 def staged_group_id @staged_group_id ||= redis.hget(staged_group_key, "staged_group_id") end
staged_group_key()
click to toggle source
# File lib/resque/plugins/stages/staged_group_stage.rb, line 245 def staged_group_key "#{stage_key}::staged_group" end