class Dwf::Callback
Constants
- DEFAULT_KEY
Public Instance Methods
process_next_step(status, options)
click to toggle source
# File lib/dwf/callback.rb, line 8 def process_next_step(status, options) previous_job_names = options['names'] workflow_id = options['workflow_id'] processing_job_names = previous_job_names.map do |job_name| job = client.find_job(workflow_id, job_name) job.outgoing end.flatten.uniq return if processing_job_names.empty? overall = Sidekiq::Batch.new(status.parent_bid) overall.jobs { setup_batches(processing_job_names, workflow_id) } end
start(job)
click to toggle source
# File lib/dwf/callback.rb, line 21 def start(job) job.outgoing.any? ? start_with_batch(job) : job.perform_async end
Private Instance Methods
classify_jobs(jobs)
click to toggle source
# File lib/dwf/callback.rb, line 51 def classify_jobs(jobs) hash = {} jobs.each do |job| outgoing_jobs = job.outgoing key = outgoing_jobs.empty? ? DEFAULT_KEY : outgoing_jobs.join hash[key] = hash[key].nil? ? [job] : hash[key].push(job) end hash end
client()
click to toggle source
# File lib/dwf/callback.rb, line 85 def client @client ||= Dwf::Client.new end
fetch_jobs(processing_job_names, workflow_id)
click to toggle source
# File lib/dwf/callback.rb, line 62 def fetch_jobs(processing_job_names, workflow_id) processing_job_names.map do |job_name| client.find_job(workflow_id, job_name) end.compact end
setup_batch(jobs, workflow_id)
click to toggle source
# File lib/dwf/callback.rb, line 38 def setup_batch(jobs, workflow_id) batch = Sidekiq::Batch.new batch.on( :success, 'Dwf::Callback#process_next_step', names: jobs.map(&:klass), workflow_id: workflow_id ) batch.jobs do jobs.each { |job| job.persist_and_perform_async! if job.ready_to_start? } end end
setup_batches(processing_job_names, workflow_id)
click to toggle source
# File lib/dwf/callback.rb, line 27 def setup_batches(processing_job_names, workflow_id) jobs = fetch_jobs(processing_job_names, workflow_id) jobs_classification = classify_jobs jobs jobs_classification.each do |key, batch_jobs| with_lock workflow_id, key do setup_batch(batch_jobs, workflow_id) end end end
start_with_batch(job)
click to toggle source
# File lib/dwf/callback.rb, line 74 def start_with_batch(job) batch = Sidekiq::Batch.new batch.on( :success, 'Dwf::Callback#process_next_step', names: [job.name], workflow_id: job.workflow_id ) batch.jobs { job.perform_async } end
with_lock(workflow_id, job_name) { || ... }
click to toggle source
# File lib/dwf/callback.rb, line 68 def with_lock(workflow_id, job_name) client.check_or_lock(workflow_id, job_name) yield client.release_lock(workflow_id, job_name) end