module Cadence

This class is responsible for matching an executable (activity or workflow) name to a class implementing it.

TODO: This class should be responsible for handling executable versions

when these are implemented

Provides context for Cadence::Activity::WorkflowConvenienceMethods

This class implements a very simple ThreadPool with the ability to block until at least one thread becomes available. This allows Pollers to only poll when there's an available thread in the pool.

NOTE: There's a minor race condition that can occur between calling

#wait_for_available_threads and #schedule, but should be rare

This context class is available in the workflow implementation and provides context and methods for interacting with Cadence

Constants

VERSION

Public Class Methods

complete_activity(async_token, result = nil) click to toggle source
# File lib/cadence.rb, line 110
def complete_activity(async_token, result = nil)
  details = Activity::AsyncToken.decode(async_token)

  client.respond_activity_task_completed_by_id(
    domain: details.domain,
    activity_id: details.activity_id,
    workflow_id: details.workflow_id,
    run_id: details.run_id,
    result: result
  )
end
configuration() click to toggle source
# File lib/cadence.rb, line 139
def configuration
  @configuration ||= Configuration.new
end
configure() { |configuration| ... } click to toggle source
# File lib/cadence.rb, line 135
def configure(&block)
  yield configuration
end
fail_activity(async_token, error) click to toggle source
# File lib/cadence.rb, line 122
def fail_activity(async_token, error)
  details = Activity::AsyncToken.decode(async_token)

  client.respond_activity_task_failed_by_id(
    domain: details.domain,
    activity_id: details.activity_id,
    workflow_id: details.workflow_id,
    run_id: details.run_id,
    reason: error.class.name,
    details: error.message
  )
end
fetch_workflow_execution_info(domain, workflow_id, run_id) click to toggle source
# File lib/cadence.rb, line 100
def fetch_workflow_execution_info(domain, workflow_id, run_id)
  response = client.describe_workflow_execution(
    domain: domain,
    workflow_id: workflow_id,
    run_id: run_id
  )

  Workflow::ExecutionInfo.generate_from(response.workflowExecutionInfo)
end
get_workflow_history(domain:, workflow_id:, run_id:) click to toggle source
# File lib/cadence.rb, line 151
def get_workflow_history(domain:, workflow_id:, run_id:)
  history_response = client.get_workflow_execution_history(
    domain: domain,
    workflow_id: workflow_id,
    run_id: run_id
  )
  Workflow::History.new(history_response.history.events)
end
logger() click to toggle source
# File lib/cadence.rb, line 143
def logger
  configuration.logger
end
metrics() click to toggle source
# File lib/cadence.rb, line 147
def metrics
  @metrics ||= Metrics.new(configuration.metrics_adapter)
end
register_domain(name, description = nil) click to toggle source
# File lib/cadence.rb, line 59
def register_domain(name, description = nil)
  client.register_domain(name: name, description: description)
rescue CadenceThrift::DomainAlreadyExistsError
  nil
end
reset_workflow(domain, workflow_id, run_id, decision_task_id: nil, reason: 'manual reset') click to toggle source
# File lib/cadence.rb, line 75
def reset_workflow(domain, workflow_id, run_id, decision_task_id: nil, reason: 'manual reset')
  decision_task_id ||= get_last_completed_decision_task(domain, workflow_id, run_id)
  raise Error, 'Could not find a completed decision task event' unless decision_task_id

  response = client.reset_workflow_execution(
    domain: domain,
    workflow_id: workflow_id,
    run_id: run_id,
    reason: reason,
    decision_task_event_id: decision_task_id
  )

  response.runId
end
schedule_workflow(workflow, cron_schedule, *input, **args) click to toggle source
# File lib/cadence.rb, line 36
def schedule_workflow(workflow, cron_schedule, *input, **args)
  options = args.delete(:options) || {}
  input << args unless args.empty?

  execution_options = ExecutionOptions.new(workflow, options)
  workflow_id = options[:workflow_id] || SecureRandom.uuid

  response = client.start_workflow_execution(
    domain: execution_options.domain,
    workflow_id: workflow_id,
    workflow_name: execution_options.name,
    task_list: execution_options.task_list,
    input: input,
    execution_timeout: execution_options.timeouts[:execution],
    task_timeout: execution_options.timeouts[:task],
    workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
    headers: execution_options.headers,
    cron_schedule: cron_schedule
  )

  response.runId
end
signal_workflow(workflow, signal, workflow_id, run_id, input = nil) click to toggle source
# File lib/cadence.rb, line 65
def signal_workflow(workflow, signal, workflow_id, run_id, input = nil)
  client.signal_workflow_execution(
    domain: workflow.domain, # TODO: allow passing domain instead
    workflow_id: workflow_id,
    run_id: run_id,
    signal: signal,
    input: input
  )
end
start_workflow(workflow, *input, **args) click to toggle source
# File lib/cadence.rb, line 14
def start_workflow(workflow, *input, **args)
  options = args.delete(:options) || {}
  input << args unless args.empty?

  execution_options = ExecutionOptions.new(workflow, options)
  workflow_id = options[:workflow_id] || SecureRandom.uuid

  response = client.start_workflow_execution(
    domain: execution_options.domain,
    workflow_id: workflow_id,
    workflow_name: execution_options.name,
    task_list: execution_options.task_list,
    input: input,
    execution_timeout: execution_options.timeouts[:execution],
    task_timeout: execution_options.timeouts[:task],
    workflow_id_reuse_policy: options[:workflow_id_reuse_policy],
    headers: execution_options.headers
  )

  response.runId
end
terminate_workflow(domain, workflow_id, run_id, reason: 'manual termination', details: nil) click to toggle source
# File lib/cadence.rb, line 90
def terminate_workflow(domain, workflow_id, run_id, reason: 'manual termination', details: nil)
  client.terminate_workflow_execution(
    domain: domain,
    workflow_id: workflow_id,
    run_id: run_id,
    reason: reason,
    details: details
  )
end

Private Class Methods

client() click to toggle source
# File lib/cadence.rb, line 162
def client
  @client ||= Cadence::Client.generate
end
get_last_completed_decision_task(domain, workflow_id, run_id) click to toggle source
# File lib/cadence.rb, line 166
def get_last_completed_decision_task(domain, workflow_id, run_id)
  history = get_workflow_history(
    domain: domain,
    workflow_id: workflow_id,
    run_id: run_id
  )

  decision_task_event = history.last_completed_decision_task

  decision_task_event&.id
end