module Cadence
This class is responsible for matching an executable (activity or workflow) name to a class implementing it.
TODO: This class should be responsible for handling executable versions
when these are implemented
Provides context for Cadence::Activity::WorkflowConvenienceMethods
This class implements a very simple ThreadPool
with the ability to block until at least one thread becomes available. This allows Pollers to only poll when there's an available thread in the pool.
NOTE: There's a minor race condition that can occur between calling
#wait_for_available_threads and #schedule, but should be rare
This context class is available in the workflow implementation and provides context and methods for interacting with Cadence
Constants
- VERSION
Public Class Methods
complete_activity(async_token, result = nil)
click to toggle source
# File lib/cadence.rb, line 110 def complete_activity(async_token, result = nil) details = Activity::AsyncToken.decode(async_token) client.respond_activity_task_completed_by_id( domain: details.domain, activity_id: details.activity_id, workflow_id: details.workflow_id, run_id: details.run_id, result: result ) end
configuration()
click to toggle source
# File lib/cadence.rb, line 139 def configuration @configuration ||= Configuration.new end
configure() { |configuration| ... }
click to toggle source
# File lib/cadence.rb, line 135 def configure(&block) yield configuration end
fail_activity(async_token, error)
click to toggle source
# File lib/cadence.rb, line 122 def fail_activity(async_token, error) details = Activity::AsyncToken.decode(async_token) client.respond_activity_task_failed_by_id( domain: details.domain, activity_id: details.activity_id, workflow_id: details.workflow_id, run_id: details.run_id, reason: error.class.name, details: error.message ) end
fetch_workflow_execution_info(domain, workflow_id, run_id)
click to toggle source
# File lib/cadence.rb, line 100 def fetch_workflow_execution_info(domain, workflow_id, run_id) response = client.describe_workflow_execution( domain: domain, workflow_id: workflow_id, run_id: run_id ) Workflow::ExecutionInfo.generate_from(response.workflowExecutionInfo) end
get_workflow_history(domain:, workflow_id:, run_id:)
click to toggle source
# File lib/cadence.rb, line 151 def get_workflow_history(domain:, workflow_id:, run_id:) history_response = client.get_workflow_execution_history( domain: domain, workflow_id: workflow_id, run_id: run_id ) Workflow::History.new(history_response.history.events) end
logger()
click to toggle source
# File lib/cadence.rb, line 143 def logger configuration.logger end
metrics()
click to toggle source
# File lib/cadence.rb, line 147 def metrics @metrics ||= Metrics.new(configuration.metrics_adapter) end
register_domain(name, description = nil)
click to toggle source
# File lib/cadence.rb, line 59 def register_domain(name, description = nil) client.register_domain(name: name, description: description) rescue CadenceThrift::DomainAlreadyExistsError nil end
reset_workflow(domain, workflow_id, run_id, decision_task_id: nil, reason: 'manual reset')
click to toggle source
# File lib/cadence.rb, line 75 def reset_workflow(domain, workflow_id, run_id, decision_task_id: nil, reason: 'manual reset') decision_task_id ||= get_last_completed_decision_task(domain, workflow_id, run_id) raise Error, 'Could not find a completed decision task event' unless decision_task_id response = client.reset_workflow_execution( domain: domain, workflow_id: workflow_id, run_id: run_id, reason: reason, decision_task_event_id: decision_task_id ) response.runId end
schedule_workflow(workflow, cron_schedule, *input, **args)
click to toggle source
# File lib/cadence.rb, line 36 def schedule_workflow(workflow, cron_schedule, *input, **args) options = args.delete(:options) || {} input << args unless args.empty? execution_options = ExecutionOptions.new(workflow, options) workflow_id = options[:workflow_id] || SecureRandom.uuid response = client.start_workflow_execution( domain: execution_options.domain, workflow_id: workflow_id, workflow_name: execution_options.name, task_list: execution_options.task_list, input: input, execution_timeout: execution_options.timeouts[:execution], task_timeout: execution_options.timeouts[:task], workflow_id_reuse_policy: options[:workflow_id_reuse_policy], headers: execution_options.headers, cron_schedule: cron_schedule ) response.runId end
signal_workflow(workflow, signal, workflow_id, run_id, input = nil)
click to toggle source
# File lib/cadence.rb, line 65 def signal_workflow(workflow, signal, workflow_id, run_id, input = nil) client.signal_workflow_execution( domain: workflow.domain, # TODO: allow passing domain instead workflow_id: workflow_id, run_id: run_id, signal: signal, input: input ) end
start_workflow(workflow, *input, **args)
click to toggle source
# File lib/cadence.rb, line 14 def start_workflow(workflow, *input, **args) options = args.delete(:options) || {} input << args unless args.empty? execution_options = ExecutionOptions.new(workflow, options) workflow_id = options[:workflow_id] || SecureRandom.uuid response = client.start_workflow_execution( domain: execution_options.domain, workflow_id: workflow_id, workflow_name: execution_options.name, task_list: execution_options.task_list, input: input, execution_timeout: execution_options.timeouts[:execution], task_timeout: execution_options.timeouts[:task], workflow_id_reuse_policy: options[:workflow_id_reuse_policy], headers: execution_options.headers ) response.runId end
terminate_workflow(domain, workflow_id, run_id, reason: 'manual termination', details: nil)
click to toggle source
# File lib/cadence.rb, line 90 def terminate_workflow(domain, workflow_id, run_id, reason: 'manual termination', details: nil) client.terminate_workflow_execution( domain: domain, workflow_id: workflow_id, run_id: run_id, reason: reason, details: details ) end
Private Class Methods
client()
click to toggle source
# File lib/cadence.rb, line 162 def client @client ||= Cadence::Client.generate end
get_last_completed_decision_task(domain, workflow_id, run_id)
click to toggle source
# File lib/cadence.rb, line 166 def get_last_completed_decision_task(domain, workflow_id, run_id) history = get_workflow_history( domain: domain, workflow_id: workflow_id, run_id: run_id ) decision_task_event = history.last_completed_decision_task decision_task_event&.id end