class SidekiqRobustJob::SidekiqJobManager
Attributes
clock[R]
digest_generator[R]
jobs_repository[R]
memory_monitor[R]
Public Class Methods
new(jobs_repository:, clock:, digest_generator:, memory_monitor:)
click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 6 def initialize(jobs_repository:, clock:, digest_generator:, memory_monitor:) @jobs_repository = jobs_repository @clock = clock @digest_generator = digest_generator @memory_monitor = memory_monitor end
Public Instance Methods
perform(job_id)
click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 44 def perform(job_id) job = jobs_repository.find(job_id) return if job.unprocessable? job.started(memory_monitor: memory_monitor) jobs_repository.save(job) job.execute end
perform_async(job_class, *arguments)
click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 13 def perform_async(job_class, *arguments) job = create_job(job_class, *arguments) return if job.unprocessable? job_class.original_perform_async(job.id).tap do |sidekiq_jid| job.assign_sidekiq_data(execute_at: clock.now, sidekiq_jid: sidekiq_jid) jobs_repository.save(job) end end
perform_at(job_class, time, *arguments)
click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 31 def perform_at(job_class, time, *arguments) job = create_job(job_class, *arguments) return if job.unprocessable? job_class.original_perform_at(time, job.id).tap do |sidekiq_jid| job.assign_sidekiq_data(execute_at: time, sidekiq_jid: sidekiq_jid) jobs_repository.save(job) end end
perform_in(job_class, interval, *arguments)
click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 22 def perform_in(job_class, interval, *arguments) job = create_job(job_class, *arguments) return if job.unprocessable? job_class.original_perform_in(interval, job.id).tap do |sidekiq_jid| job.assign_sidekiq_data(execute_at: clock.now + interval, sidekiq_jid: sidekiq_jid) jobs_repository.save(job) end end
set(job_class, options = {})
click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 40 def set(job_class, options = {}) SidekiqRobustJob::DependenciesContainer["setter_proxy_job"].build(job_class, options) end
Private Instance Methods
create_job(job_class, *arguments)
click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 55 def create_job(job_class, *arguments) jobs_repository.build( job_class: job_class, arguments: Array.wrap(arguments), enqueued_at: clock.now, digest: digest_generator.generate(job_class, *arguments), queue: job_class.get_sidekiq_options.fetch("queue", "default"), uniqueness_strategy: job_class.get_sidekiq_options.fetch("uniqueness_strategy", SidekiqRobustJob::UniquenessStrategy.no_uniqueness), enqueue_conflict_resolution_strategy: job_class.get_sidekiq_options.fetch("enqueue_conflict_resolution_strategy", SidekiqRobustJob::EnqueueConflictResolutionStrategy.do_nothing) ).tap do |job| jobs_repository.save(job) if persist_job_immediately?(job_class) jobs_repository.transaction do resolve_potential_conflict_for_enqueueing(job) jobs_repository.save(job) if persist_after_resolving_conflict_for_enqueueing(job, job_class) end end end
persist_after_resolving_conflict_for_enqueueing(job, job_class)
click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 85 def persist_after_resolving_conflict_for_enqueueing(job, job_class) return true if persist_self_dropped_jobs?(job_class) !job.dropped? end
persist_job_immediately?(job_class)
click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 81 def persist_job_immediately?(job_class) persist_self_dropped_jobs?(job_class) end
persist_self_dropped_jobs?(job_class)
click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 91 def persist_self_dropped_jobs?(job_class) job_class.get_sidekiq_options.fetch("persist_self_dropped_jobs", true) end
resolve_potential_conflict_for_enqueueing(job)
click to toggle source
# File lib/sidekiq_robust_job/sidekiq_job_manager.rb, line 75 def resolve_potential_conflict_for_enqueueing(job) SidekiqRobustJob::DependenciesContainer["enqueue_conflict_resolution_resolver"] .resolve(job.enqueue_conflict_resolution_strategy) .execute(job) end