class Dwf::Callback

Constants

DEFAULT_KEY

Public Instance Methods

process_next_step(status, options) click to toggle source
# File lib/dwf/callback.rb, line 8
def process_next_step(status, options)
  previous_job_names = options['names']
  workflow_id = options['workflow_id']
  processing_job_names = previous_job_names.map do |job_name|
    job = client.find_job(workflow_id, job_name)
    job.outgoing
  end.flatten.uniq
  return if processing_job_names.empty?

  overall = Sidekiq::Batch.new(status.parent_bid)
  overall.jobs { setup_batches(processing_job_names, workflow_id) }
end
start(job) click to toggle source
# File lib/dwf/callback.rb, line 21
def start(job)
  job.outgoing.any? ? start_with_batch(job) : job.perform_async
end

Private Instance Methods

classify_jobs(jobs) click to toggle source
# File lib/dwf/callback.rb, line 51
def classify_jobs(jobs)
  hash = {}
  jobs.each do |job|
    outgoing_jobs = job.outgoing
    key = outgoing_jobs.empty? ? DEFAULT_KEY : outgoing_jobs.join
    hash[key] = hash[key].nil? ? [job] : hash[key].push(job)
  end

  hash
end
client() click to toggle source
# File lib/dwf/callback.rb, line 85
def client
  @client ||= Dwf::Client.new
end
fetch_jobs(processing_job_names, workflow_id) click to toggle source
# File lib/dwf/callback.rb, line 62
def fetch_jobs(processing_job_names, workflow_id)
  processing_job_names.map do |job_name|
    client.find_job(workflow_id, job_name)
  end.compact
end
setup_batch(jobs, workflow_id) click to toggle source
# File lib/dwf/callback.rb, line 38
def setup_batch(jobs, workflow_id)
  batch = Sidekiq::Batch.new
  batch.on(
    :success,
    'Dwf::Callback#process_next_step',
    names: jobs.map(&:klass),
    workflow_id: workflow_id
  )
  batch.jobs do
    jobs.each { |job| job.persist_and_perform_async! if job.ready_to_start? }
  end
end
setup_batches(processing_job_names, workflow_id) click to toggle source
# File lib/dwf/callback.rb, line 27
def setup_batches(processing_job_names, workflow_id)
  jobs = fetch_jobs(processing_job_names, workflow_id)
  jobs_classification = classify_jobs jobs

  jobs_classification.each do |key, batch_jobs|
    with_lock workflow_id, key do
      setup_batch(batch_jobs, workflow_id)
    end
  end
end
start_with_batch(job) click to toggle source
# File lib/dwf/callback.rb, line 74
def start_with_batch(job)
  batch = Sidekiq::Batch.new
  batch.on(
    :success,
    'Dwf::Callback#process_next_step',
    names: [job.name],
    workflow_id: job.workflow_id
  )
  batch.jobs { job.perform_async }
end
with_lock(workflow_id, job_name) { || ... } click to toggle source
# File lib/dwf/callback.rb, line 68
def with_lock(workflow_id, job_name)
  client.check_or_lock(workflow_id, job_name)
  yield
  client.release_lock(workflow_id, job_name)
end