class Cadence::Workflow::DecisionTaskProcessor

Constants

MAX_FAILED_ATTEMPTS

Attributes

client[R]
domain[R]
middleware_chain[R]
task[R]
task_token[R]
workflow_class[R]
workflow_name[R]

Public Class Methods

new(task, domain, workflow_lookup, client, middleware_chain) click to toggle source
# File lib/cadence/workflow/decision_task_processor.rb, line 11
def initialize(task, domain, workflow_lookup, client, middleware_chain)
  @task = task
  @domain = domain
  @task_token = task.taskToken
  @workflow_name = task.workflowType.name
  @workflow_class = workflow_lookup.find(workflow_name)
  @client = client
  @middleware_chain = middleware_chain
end

Public Instance Methods

process() click to toggle source
# File lib/cadence/workflow/decision_task_processor.rb, line 21
def process
  start_time = Time.now

  Cadence.logger.info("Processing a decision task for #{workflow_name}")
  Cadence.metrics.timing('decision_task.queue_time', queue_time_ms, workflow: workflow_name)

  unless workflow_class
    fail_task('Workflow does not exist')
    return
  end

  history = fetch_full_history
  # TODO: For sticky workflows we need to cache the Executor instance
  executor = Workflow::Executor.new(workflow_class, history)
  metadata = Metadata.generate(Metadata::DECISION_TYPE, task, domain)

  decisions = middleware_chain.invoke(metadata) do
    executor.run
  end

  complete_task(decisions)
rescue StandardError => error
  fail_task(error.inspect)
  Cadence.logger.debug(error.backtrace.join("\n"))
ensure
  time_diff_ms = ((Time.now - start_time) * 1000).round
  Cadence.metrics.timing('decision_task.latency', time_diff_ms, workflow: workflow_name)
  Cadence.logger.debug("Decision task processed in #{time_diff_ms}ms")
end

Private Instance Methods

complete_task(decisions) click to toggle source
# File lib/cadence/workflow/decision_task_processor.rb, line 82
def complete_task(decisions)
  Cadence.logger.info("Decision task for #{workflow_name} completed")

  client.respond_decision_task_completed(
    task_token: task_token,
    decisions: serialize_decisions(decisions)
  )
end
fail_task(message) click to toggle source
# File lib/cadence/workflow/decision_task_processor.rb, line 91
def fail_task(message)
  Cadence.logger.error("Decision task for #{workflow_name} failed with: #{message}")

  # Stop from getting into infinite loop if the error persists
  return if task.attempt >= MAX_FAILED_ATTEMPTS

  client.respond_decision_task_failed(
    task_token: task_token,
    cause: CadenceThrift::DecisionTaskFailedCause::UNHANDLED_DECISION,
    details: message
  )
end
fetch_full_history() click to toggle source
# File lib/cadence/workflow/decision_task_processor.rb, line 63
def fetch_full_history
  events = task.history.events.to_a
  next_page_token = task.nextPageToken

  while next_page_token do
    response = client.get_workflow_execution_history(
      domain: domain,
      workflow_id: task.workflowExecution.workflowId,
      run_id: task.workflowExecution.runId,
      next_page_token: next_page_token
    )

    events += response.history.events.to_a
    next_page_token = response.nextPageToken
  end

  Workflow::History.new(events)
end
queue_time_ms() click to toggle source
# File lib/cadence/workflow/decision_task_processor.rb, line 55
def queue_time_ms
  ((task.startedTimestamp - task.scheduledTimestamp) / 1_000_000).round
end
serialize_decisions(decisions) click to toggle source
# File lib/cadence/workflow/decision_task_processor.rb, line 59
def serialize_decisions(decisions)
  decisions.map { |(_, decision)| Workflow::Serializer.serialize(decision) }
end