class SidekiqWorkflows::Worker
Public Class Methods
perform_async(workflow, *args)
click to toggle source
# File lib/sidekiq_workflows/worker.rb, line 45 def self.perform_async(workflow, *args) set(queue: worker_queue).send(:perform_async, workflow.serialize, *args) end
perform_workflow(workflow, on_complete: nil, on_complete_options: {}) { |batch| ... }
click to toggle source
# File lib/sidekiq_workflows/worker.rb, line 49 def self.perform_workflow(workflow, on_complete: nil, on_complete_options: {}) batch = Sidekiq::Batch.new batch.callback_queue = SidekiqWorkflows.callback_queue unless SidekiqWorkflows.callback_queue.nil? batch.description = "Workflow #{workflow.workflow_uuid || '-'} root batch" batch.on(:complete, on_complete, on_complete_options.merge(workflow_uuid: workflow.workflow_uuid)) if on_complete yield batch if block_given? batch.jobs do perform_async(workflow) end batch.bid end
Private Class Methods
worker_queue()
click to toggle source
# File lib/sidekiq_workflows/worker.rb, line 65 def self.worker_queue SidekiqWorkflows.worker_queue || Sidekiq.default_worker_options['queue'] end
Public Instance Methods
on_complete(status, options)
click to toggle source
# File lib/sidekiq_workflows/worker.rb, line 34 def on_complete(status, options) workflow = ensure_deserialized(options['workflow']) if workflow.on_partial_complete klass, method = workflow.on_partial_complete.split('#') ActiveSupport::Inflector.constantize(klass).new.send(method, status, options) end perform_children(status.parent_batch, workflow) unless status.failures > 0 end
perform(workflow)
click to toggle source
# File lib/sidekiq_workflows/worker.rb, line 9 def perform(workflow) workflow = ensure_deserialized(workflow) case workflow.class.name when 'SidekiqWorkflows::RootNode' perform_children(batch, workflow) when 'SidekiqWorkflows::WorkerNode' batch.jobs do child_batch = Sidekiq::Batch.new child_batch.callback_queue = SidekiqWorkflows.callback_queue unless SidekiqWorkflows.callback_queue.nil? child_batch.description = "Workflow #{workflow.workflow_uuid || '-'}" child_batch.on(:complete, 'SidekiqWorkflows::Worker#on_complete', workflow: workflow.serialize, workflow_uuid: workflow.workflow_uuid) child_batch.jobs do workflow.workers.each do |entry| if entry[:delay] entry[:worker].perform_in(entry[:delay], *entry[:payload]) else entry[:worker].perform_async(*entry[:payload]) end end end end end end
Private Instance Methods
ensure_deserialized(workflow)
click to toggle source
# File lib/sidekiq_workflows/worker.rb, line 77 def ensure_deserialized(workflow) workflow.is_a?(String) ? SidekiqWorkflows.deserialize(workflow) : workflow end
perform_children(batch, workflow)
click to toggle source
# File lib/sidekiq_workflows/worker.rb, line 69 def perform_children(batch, workflow) batch.jobs do workflow.children.each do |child| self.class.perform_async(child) end end end