class Cadence::Workflow::DecisionTaskProcessor
Constants
- MAX_FAILED_ATTEMPTS
Attributes
client[R]
domain[R]
middleware_chain[R]
task[R]
task_token[R]
workflow_class[R]
workflow_name[R]
Public Class Methods
new(task, domain, workflow_lookup, client, middleware_chain)
click to toggle source
# File lib/cadence/workflow/decision_task_processor.rb, line 11 def initialize(task, domain, workflow_lookup, client, middleware_chain) @task = task @domain = domain @task_token = task.taskToken @workflow_name = task.workflowType.name @workflow_class = workflow_lookup.find(workflow_name) @client = client @middleware_chain = middleware_chain end
Public Instance Methods
process()
click to toggle source
# File lib/cadence/workflow/decision_task_processor.rb, line 21 def process start_time = Time.now Cadence.logger.info("Processing a decision task for #{workflow_name}") Cadence.metrics.timing('decision_task.queue_time', queue_time_ms, workflow: workflow_name) unless workflow_class fail_task('Workflow does not exist') return end history = fetch_full_history # TODO: For sticky workflows we need to cache the Executor instance executor = Workflow::Executor.new(workflow_class, history) metadata = Metadata.generate(Metadata::DECISION_TYPE, task, domain) decisions = middleware_chain.invoke(metadata) do executor.run end complete_task(decisions) rescue StandardError => error fail_task(error.inspect) Cadence.logger.debug(error.backtrace.join("\n")) ensure time_diff_ms = ((Time.now - start_time) * 1000).round Cadence.metrics.timing('decision_task.latency', time_diff_ms, workflow: workflow_name) Cadence.logger.debug("Decision task processed in #{time_diff_ms}ms") end
Private Instance Methods
complete_task(decisions)
click to toggle source
# File lib/cadence/workflow/decision_task_processor.rb, line 82 def complete_task(decisions) Cadence.logger.info("Decision task for #{workflow_name} completed") client.respond_decision_task_completed( task_token: task_token, decisions: serialize_decisions(decisions) ) end
fail_task(message)
click to toggle source
# File lib/cadence/workflow/decision_task_processor.rb, line 91 def fail_task(message) Cadence.logger.error("Decision task for #{workflow_name} failed with: #{message}") # Stop from getting into infinite loop if the error persists return if task.attempt >= MAX_FAILED_ATTEMPTS client.respond_decision_task_failed( task_token: task_token, cause: CadenceThrift::DecisionTaskFailedCause::UNHANDLED_DECISION, details: message ) end
fetch_full_history()
click to toggle source
# File lib/cadence/workflow/decision_task_processor.rb, line 63 def fetch_full_history events = task.history.events.to_a next_page_token = task.nextPageToken while next_page_token do response = client.get_workflow_execution_history( domain: domain, workflow_id: task.workflowExecution.workflowId, run_id: task.workflowExecution.runId, next_page_token: next_page_token ) events += response.history.events.to_a next_page_token = response.nextPageToken end Workflow::History.new(events) end
queue_time_ms()
click to toggle source
# File lib/cadence/workflow/decision_task_processor.rb, line 55 def queue_time_ms ((task.startedTimestamp - task.scheduledTimestamp) / 1_000_000).round end
serialize_decisions(decisions)
click to toggle source
# File lib/cadence/workflow/decision_task_processor.rb, line 59 def serialize_decisions(decisions) decisions.map { |(_, decision)| Workflow::Serializer.serialize(decision) } end