class Cadence::Workflow::StateManager
Constants
- RELEASE_MARKER
- SIDE_EFFECT_MARKER
Attributes
decision_tracker[R]
decisions[R]
dispatcher[R]
local_time[R]
marker_ids[R]
releases[R]
side_effects[R]
Public Class Methods
new(dispatcher)
click to toggle source
# File lib/cadence/workflow/state_manager.rb, line 19 def initialize(dispatcher) @dispatcher = dispatcher @decisions = [] @marker_ids = Set.new @releases = {} @side_effects = [] @decision_tracker = Hash.new { |hash, key| hash[key] = DecisionStateMachine.new } @last_event_id = 0 @local_time = nil @replay = false end
Public Instance Methods
apply(history_window)
click to toggle source
# File lib/cadence/workflow/state_manager.rb, line 71 def apply(history_window) @replay = history_window.replay? @local_time = history_window.local_time @last_event_id = history_window.last_event_id # handle markers first since their data is needed for processing events history_window.markers.each do |event| apply_event(event) end history_window.events.each do |event| apply_event(event) end end
next_side_effect()
click to toggle source
# File lib/cadence/workflow/state_manager.rb, line 67 def next_side_effect side_effects.shift end
release?(release_name)
click to toggle source
# File lib/cadence/workflow/state_manager.rb, line 61 def release?(release_name) track_release(release_name) unless releases.key?(release_name) releases[release_name] end
replay?()
click to toggle source
# File lib/cadence/workflow/state_manager.rb, line 31 def replay? @replay end
schedule(decision)
click to toggle source
# File lib/cadence/workflow/state_manager.rb, line 35 def schedule(decision) # Fast-forward event IDs to skip all the markers (version markers can # be removed, so we can't rely on them being scheduled during a replay) decision_id = next_event_id while marker_ids.include?(decision_id) do decision_id = next_event_id end cancelation_id = case decision when Decision::ScheduleActivity decision.activity_id ||= decision_id when Decision::StartChildWorkflow decision.workflow_id ||= decision_id when Decision::StartTimer decision.timer_id ||= decision_id end state_machine = decision_tracker[decision_id] state_machine.requested if state_machine.state == DecisionStateMachine::NEW_STATE decisions << [decision_id, decision] return [event_target_from(decision_id, decision), cancelation_id] end
Private Instance Methods
apply_event(event)
click to toggle source
# File lib/cadence/workflow/state_manager.rb, line 94 def apply_event(event) state_machine = decision_tracker[event.decision_id] target = History::EventTarget.from_event(event) case event.type when 'WorkflowExecutionStarted' state_machine.start dispatch( History::EventTarget.workflow, 'started', safe_parse(event.attributes.input), Metadata.generate(Metadata::WORKFLOW_TYPE, event.attributes) ) when 'WorkflowExecutionCompleted' # todo when 'WorkflowExecutionFailed' # todo when 'WorkflowExecutionTimedOut' # todo when 'DecisionTaskScheduled' # todo when 'DecisionTaskStarted' # todo when 'DecisionTaskCompleted' # todo when 'DecisionTaskTimedOut' # todo when 'DecisionTaskFailed' # todo when 'ActivityTaskScheduled' state_machine.schedule discard_decision(target) when 'ActivityTaskStarted' state_machine.start when 'ActivityTaskCompleted' state_machine.complete dispatch(target, 'completed', safe_parse(event.attributes.result)) when 'ActivityTaskFailed' state_machine.fail dispatch(target, 'failed', event.attributes.reason, safe_parse(event.attributes.details)) when 'ActivityTaskTimedOut' state_machine.time_out type = CadenceThrift::TimeoutType::VALUE_MAP[event.attributes.timeoutType] dispatch(target, 'failed', 'Cadence::TimeoutError', "Timeout type: #{type}") when 'ActivityTaskCancelRequested' state_machine.requested discard_decision(target) when 'RequestCancelActivityTaskFailed' state_machine.fail dispatch(target, 'failed', event.attributes.cause, nil) when 'ActivityTaskCanceled' state_machine.cancel dispatch(target, 'failed', 'CANCELLED', safe_parse(event.attributes.details)) when 'TimerStarted' state_machine.start discard_decision(target) when 'TimerFired' state_machine.complete dispatch(target, 'fired') when 'CancelTimerFailed' state_machine.failed dispatch(target, 'failed', event.attributes.cause, nil) when 'TimerCanceled' state_machine.cancel dispatch(target, 'canceled') when 'WorkflowExecutionCancelRequested' # todo when 'WorkflowExecutionCanceled' # todo when 'RequestCancelExternalWorkflowExecutionInitiated' # todo when 'RequestCancelExternalWorkflowExecutionFailed' # todo when 'ExternalWorkflowExecutionCancelRequested' # todo when 'MarkerRecorded' state_machine.complete handle_marker(event.id, event.attributes.markerName, safe_parse(event.attributes.details)) when 'WorkflowExecutionSignaled' dispatch(target, 'signaled', event.attributes.signalName, safe_parse(event.attributes.input)) when 'WorkflowExecutionTerminated' # todo when 'WorkflowExecutionContinuedAsNew' # todo when 'StartChildWorkflowExecutionInitiated' state_machine.schedule discard_decision(target) when 'StartChildWorkflowExecutionFailed' state_machine.fail dispatch(target, 'failed', 'StandardError', safe_parse(event.attributes.cause)) when 'ChildWorkflowExecutionStarted' state_machine.start when 'ChildWorkflowExecutionCompleted' state_machine.complete dispatch(target, 'completed', safe_parse(event.attributes.result)) when 'ChildWorkflowExecutionFailed' state_machine.fail dispatch(target, 'failed', event.attributes.reason, safe_parse(event.attributes.details)) when 'ChildWorkflowExecutionCanceled' state_machine.cancel dispatch(target, 'failed', 'CANCELLED', safe_parse(event.attributes.details)) when 'ChildWorkflowExecutionTimedOut' state_machine.time_out type = CadenceThrift::TimeoutType::VALUE_MAP[event.attributes.timeoutType] dispatch(target, 'failed', 'Cadence::TimeoutError', "Timeout type: #{type}") when 'ChildWorkflowExecutionTerminated' # todo when 'SignalExternalWorkflowExecutionInitiated' # todo when 'SignalExternalWorkflowExecutionFailed' # todo when 'ExternalWorkflowExecutionSignaled' # todo when 'UpsertWorkflowSearchAttributes' # todo else raise UnsupportedEvent, event.type end end
discard_decision(target)
click to toggle source
# File lib/cadence/workflow/state_manager.rb, line 282 def discard_decision(target) # Pop the first decision from the list, it is expected to match existing_decision_id, existing_decision = decisions.shift if !existing_decision_id raise NonDeterministicWorkflowError, "A decision #{target} was not scheduled upon replay" end existing_target = event_target_from(existing_decision_id, existing_decision) if target != existing_target raise NonDeterministicWorkflowError, "Unexpected decision #{existing_target} (expected #{target})" end end
dispatch(target, name, *attributes)
click to toggle source
# File lib/cadence/workflow/state_manager.rb, line 278 def dispatch(target, name, *attributes) dispatcher.dispatch(target, name, attributes) end
event_target_from(decision_id, decision)
click to toggle source
# File lib/cadence/workflow/state_manager.rb, line 256 def event_target_from(decision_id, decision) target_type = case decision when Decision::ScheduleActivity History::EventTarget::ACTIVITY_TYPE when Decision::RequestActivityCancellation History::EventTarget::CANCEL_ACTIVITY_REQUEST_TYPE when Decision::RecordMarker History::EventTarget::MARKER_TYPE when Decision::StartTimer History::EventTarget::TIMER_TYPE when Decision::CancelTimer History::EventTarget::CANCEL_TIMER_REQUEST_TYPE when Decision::CompleteWorkflow, Decision::FailWorkflow History::EventTarget::WORKFLOW_TYPE when Decision::StartChildWorkflow History::EventTarget::CHILD_WORKFLOW_TYPE end History::EventTarget.new(decision_id, target_type) end
handle_marker(id, type, details)
click to toggle source
# File lib/cadence/workflow/state_manager.rb, line 296 def handle_marker(id, type, details) marker_ids << id case type when SIDE_EFFECT_MARKER side_effects << [id, details] when RELEASE_MARKER releases[details] = true else raise UnsupportedMarkerType, event.type end end
next_event_id()
click to toggle source
# File lib/cadence/workflow/state_manager.rb, line 90 def next_event_id @last_event_id += 1 end
safe_parse(binary)
click to toggle source
# File lib/cadence/workflow/state_manager.rb, line 319 def safe_parse(binary) JSON.deserialize(binary) end
track_release(release_name)
click to toggle source
# File lib/cadence/workflow/state_manager.rb, line 309 def track_release(release_name) # replay does not allow untracked (via marker) releases if replay? releases[release_name] = false else releases[release_name] = true schedule(Decision::RecordMarker.new(name: RELEASE_MARKER, details: release_name)) end end