class AWS::Flow::AsyncDecider
Represents an asynchronous decider class.
Attributes
Public Class Methods
Creates a new asynchronous decider.
# File lib/aws/decider/async_decider.rb, line 193 def initialize(workflow_definition_factory, history_helper, decision_helper) @workflow_definition_factory = workflow_definition_factory @history_helper = history_helper @decision_helper = decision_helper @decision_task = history_helper.get_decision_task @workflow_clock = WorkflowClock.new(@decision_helper) @workflow_context = WorkflowContext.new(@decision_task, @workflow_clock) @activity_client = GenericActivityClient.new(@decision_helper, nil) @workflow_client = GenericWorkflowClient.new(@decision_helper, @workflow_context) @decision_context = DecisionContext.new(@activity_client, @workflow_client, @workflow_clock, @workflow_context, @decision_helper) end
Public Instance Methods
Registers a ‘CompleteWorkflowExecution` decision.
CompleteWorkflowExecutionDecisionAttributes
# File lib/aws/decider/async_decider.rb, line 326 def complete_workflow return unless @completed && ! @unhandled_decision decision_id = [:SELF, nil] if @failure @decision_helper[decision_id] = make_fail_decision(decision_id, @failure) elsif @cancel_requested @decision_helper[decision_id] = make_cancel_decision(decision_id) else if ! @workflow_context.continue_as_new_options.nil? @decision_helper[decision_id] = continue_as_new_workflow(decision_id, @workflow_context.continue_as_new_options) else if @result.nil? @decision_helper[decision_id] = make_completion_decision(decision_id, { :decision_type => "CompleteWorkflowExecution"}) else @decision_helper[decision_id] = make_completion_decision(decision_id, { :decision_type => "CompleteWorkflowExecution", :complete_workflow_execution_decision_attributes => {:result => @result.get }}) end end end end
Indicates whether the task completed.
@return [true, false]
Returns `true` if the task is completed; `false` otherwise.
# File lib/aws/decider/async_decider.rb, line 355 def completed? @completed end
Continues this as a new workflow, using the provided decision and options.
@param [DecisionID] decision_id
The decision ID to use.
@param [WorkflowOptions] continue_as_new_options
The options to use for the new workflow.
# File lib/aws/decider/async_decider.rb, line 310 def continue_as_new_workflow(decision_id, continue_as_new_options) result = { :decision_type => "ContinueAsNewWorkflowExecution", } task_list = continue_as_new_options.task_list ? {:task_list => {:name => continue_as_new_options.task_list}} : {} to_add = continue_as_new_options.get_options([:execution_start_to_close_timeout, :task_start_to_close_timeout, :task_priority, :child_policy, :tag_list, :workflow_type_version, :input], task_list) result[:continue_as_new_workflow_execution_decision_attributes] = to_add CompleteWorkflowStateMachine.new(decision_id, result) end
@api private
# File lib/aws/decider/async_decider.rb, line 224 def decide begin decide_impl rescue Exception => error raise error ensure begin @decision_helper.workflow_context_data = @definition.get_workflow_state rescue WorkflowException => error @decision_helper.workflow_context_data = error.details rescue Exception => error @decision_helper.workflow_context_data = error.message # Catch and do stuff ensure @workflow_definition_factory.delete_workflow_definition(@definition) end end end
@api private
# File lib/aws/decider/async_decider.rb, line 244 def decide_impl single_decision_event = @history_helper.get_single_decision_events while single_decision_event.length > 0 @decision_helper.handle_decision_task_started_event [*single_decision_event].each do |event| last_non_replay_event_id = @history_helper.get_last_non_replay_event_id @workflow_clock.replaying = false if event.event_id >= last_non_replay_event_id @workflow_clock.replay_current_time_millis = @history_helper.get_replay_current_time_millis process_event(event) event_loop(event) end @task_token = @history_helper.get_decision_task.task_token complete_workflow if completed? single_decision_event = @history_helper.get_single_decision_events end if @unhandled_decision @unhandled_decision = false complete_workflow end end
@api private
# File lib/aws/decider/async_decider.rb, line 649 def event_loop(event) return if @completed begin @completed = @workflow_async_scope.eventLoop #TODO Make this a cancellationException, set it up correctly? rescue Exception => e @failure = e unless @cancel_requested @completed = true end end
@note *Beware, this getter will modify things*, as it creates decisions for the objects in the {AsyncDecider}
that need decisions sent out.
@api private
# File lib/aws/decider/async_decider.rb, line 210 def get_decisions result = @decision_helper.decision_map.values.map {|decision_object| decision_object.get_decision}.compact if result.length > DecisionHelper.maximum_decisions_per_completion result = result.slice(0, DecisionHelper.maximum_decisions_per_completion - 1) result << ({:decision_type => "StartTimer", :start_timer_decision_attributes => { :timer_id => DecisionHelper.force_immediate_decision_timer, :start_to_fire_timeout => "0" }}) end return result end
Handler for the ‘:ActivityTaskCancelRequested` event. @param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 435 def handle_activity_task_cancel_requested(event) activity_id = event.attributes[:activity_id] @decision_helper[activity_id].consume(:handle_cancellation_initiated_event) end
Handler for the ‘:ActivityTaskScheduled` event.
@param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 364 def handle_activity_task_scheduled(event) activity_id = event.attributes[:activity_id] @decision_helper.activity_scheduling_event_id_to_activity_id[event.id] = activity_id @decision_helper[activity_id].consume(:handle_initiated_event) return @decision_helper[activity_id].done? end
Handler for the ‘:CancelTimerFailed` event.
@param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 573 def handle_cancel_timer_failed(event) handle_event(event, { :id_methods => [:timer_id], :consume_symbol => :handle_cancellation_failure_event }) end
Handler for the ‘:CancelWorkflowExecutionFailed` event.
@param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 476 def handle_cancel_workflow_execution_failed(event) handle_closing_failure end
# File lib/aws/decider/async_decider.rb, line 450 def handle_closing_failure @unhandled_decision = true @decision_helper[[:SELF, nil]].consume(:handle_initiation_failed_event) end
Handler for the ‘:CompleteWorkflowExecutionFailed` event. @param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 458 def handle_complete_workflow_execution_failed(event) handle_closing_failure end
Handler for the ‘:ContinueAsNewWorkflowExecutionFailed` event.
@param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 485 def handle_continue_as_new_workflow_execution_failed(event) handle_closing_failure end
Handler for the ‘:FailWorkflowExecutionFailed` event.
@param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 467 def handle_fail_workflow_execution_failed(event) handle_closing_failure end
Handler for the ‘:RequestCancelActivityTaskFailed` event. @param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 443 def handle_request_cancel_activity_task_failed(event) handle_event(event, { :id_methods => [:activity_id], :consume_symbol => :handle_cancellation_failure_event }) end
Handler for the ‘:RequestCancelExternalWorkflowExecutionFailed` event.
@param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 549 def handle_request_cancel_external_workflow_execution_failed(event) handle_event(event, { :id_methods => [:workflow_id], :consume_symbol => :handle_cancellation_failure_event }) end
Handler for the ‘:RequestCancelExternalWorkflowExecutionInitiated` event.
@param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 537 def handle_request_cancel_external_workflow_execution_initiated(event) handle_event(event, { :id_methods => [:workflow_id], :consume_symbol => :handle_cancellation_initiated_event }) end
Handler for the ‘:SignalExternalWorkflowExecutionInitiated` event.
@param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 525 def handle_signal_external_workflow_execution_initiated(event) signal_id = event.attributes[:control] @decision_helper.signal_initiated_event_to_signal_id[event.id] = signal_id @decision_helper[signal_id].consume(:handle_initiated_event) @decision_helper[signal_id].done? end
Handler for the ‘:StartChildWorkflowExecutionInitiated` event.
@param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 561 def handle_start_child_workflow_execution_initiated(event) workflow_id = event.attributes[:workflow_id] @decision_helper.child_initiated_event_id_to_workflow_id[event.id] = workflow_id @decision_helper[workflow_id].consume(:handle_initiated_event) @decision_helper[workflow_id].done? end
Handler for the ‘:StartTimerFailed` event.
@param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 407 def handle_start_timer_failed(event) timer_id = event.attributes.timer_id return if timer_id == DecisionHelper.force_immediate_decision_timer handle_event(event, { :id_methods => [:timer_id], :consume_symbol => :handle_completion_event, :decision_helper_scheduled => :scheduled_timers, :handle_open_request => lambda do |event, open_request| exception = StartTimerFailedException(event.id, timer_id, nil, event.attributes.cause) open_request.completion_handle.fail(exception) end }) state_machine = @decision_helper[timer_id] end
Handler for the ‘:TimerCanceled` event.
@param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 506 def handle_timer_canceled(event) handle_event(event, { :id_methods => [:timer_id], :consume_symbol => :handle_cancellation_event, :decision_helper_scheduled => :scheduled_timers, :handle_open_request => lambda do |event, open_request| if ! open_request.nil? cancellation_exception = CancellationException.new("Cancelled from a Timer Cancelled event") open_request.completion_handle.fail(cancellation_exception) end end }) end
Handler for the ‘:TimerFired` event.
@param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 390 def handle_timer_fired(event) timer_id = event.attributes[:timer_id] return if timer_id == DecisionHelper.force_immediate_decision_timer @decision_helper[timer_id].consume(:handle_completion_event) if @decision_helper[timer_id].done? open_request = @decision_helper.scheduled_timers.delete(timer_id) return if open_request.nil? open_request.blocking_promise.set(nil) open_request.completion_handle.complete end end
Handler for the ‘:TimerStarted` event.
@param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 494 def handle_timer_started(event) timer_id = event.attributes[:timer_id] return if timer_id == DecisionHelper.force_immediate_decision_timer @decision_helper[timer_id].consume(:handle_initiated_event) @decision_helper[timer_id].done? end
Handler for the ‘:WorkflowExecutionCancelRequested` event. @param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 427 def handle_workflow_execution_cancel_requested(event) @workflow_async_scope.cancel(CancellationException.new("Cancelled from a WorkflowExecutionCancelRequested")) @cancel_requested = true end
Handler for the ‘WorkflowExecutionSignaled` event.
@param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 585 def handle_workflow_execution_signaled(event) signal_name = event.attributes[:signal_name] input = event.attributes[:input] if event.attributes.keys.include? :input input ||= NoInput.new # TODO do stuff if we are @completed t = Task.new(nil) do @definition.signal_received(signal_name, input) end task_context = TaskContext.new(:parent => @workflow_async_scope.get_closest_containing_scope, :task => t) @workflow_async_scope.get_closest_containing_scope << t end
Handler for the ‘:WorkflowExecutionStarted` event.
@param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 376 def handle_workflow_execution_started(event) @workflow_async_scope = AsyncScope.new do FlowFiber.current[:decision_context] = @decision_context input = (event.attributes.keys.include? :input) ? event.attributes[:input] : nil @definition = @workflow_definition_factory.get_workflow_definition(@decision_context) @result = @definition.execute(input) end end
# File lib/aws/decider/async_decider.rb, line 299 def make_cancel_decision(decision_id) CompleteWorkflowStateMachine.new(decision_id, {:decision_type => "CancelWorkflowExecution"}) end
# File lib/aws/decider/async_decider.rb, line 296 def make_completion_decision(decision_id, decision) CompleteWorkflowStateMachine.new(decision_id, decision) end
Registers a {FailWorkflowExecution} decision.
@param [DecisionID] decision_id
The ID of the {DecisionTaskCompleted} event corresponding to the decision task that resulted in the decision failing in this execution. This information can be useful for tracing the sequence of events back from the failure.
@param [Exception] failure
The exception that is associated with the failed workflow.
@see docs.aws.amazon.com/amazonswf/latest/apireference/API_FailWorkflowExecutionDecisionAttributes.html
FailWorkflowExecutionDecisionAttributes
# File lib/aws/decider/async_decider.rb, line 278 def make_fail_decision(decision_id, failure) decision_type = "FailWorkflowExecution" # Get the reason from the failure. Or get the message if a # CancellationException is initialized without a reason. Fall back to # a default string if nothing is provided reason = failure.reason || failure.message || "Workflow failure did not provide any reason." # Get the details from the failure. Or get the backtrace if a # CancellationException is initialized without a details. Fall back to # a default string if nothing is provided details = failure.details || failure.backtrace.to_s || "Workflow failure did not provide any details." fail_workflow_execution_decision_attributes = { reason: reason, details: details } decision = {:decision_type => decision_type, :fail_workflow_execution_decision_attributes => fail_workflow_execution_decision_attributes} CompleteWorkflowStateMachine.new(decision_id, decision) end
Processes decider events.
@param [Object] event
The event to process.
# File lib/aws/decider/async_decider.rb, line 602 def process_event(event) event_type_symbol = event.event_type.to_sym # Mangle the name so that it is handle_ + the name of the event type in snakecase handle_event = "handle_" + event.event_type.gsub(/(.)([A-Z])/,'\1_\2').downcase noop_set = Set.new([:DecisionTaskScheduled, :DecisionTaskCompleted, :DecisionTaskStarted, :DecisionTaskTimedOut, :WorkflowExecutionTimedOut, :WorkflowExecutionTerminated, :MarkerRecorded, :WorkflowExecutionCompleted, :WorkflowExecutionFailed, :WorkflowExecutionCanceled, :WorkflowExecutionContinuedAsNew, :ActivityTaskStarted]) return if noop_set.member? event_type_symbol self_set = Set.new([:TimerFired, :StartTimerFailed, :WorkflowExecutionCancel, :ActivityTaskScheduled, :WorkflowExecutionCancelRequested, :ActivityTaskCancelRequested, :RequestCancelActivityTaskFailed, :CompleteWorkflowExecutionFailed, :FailWorkflowExecutionFailed, :CancelWorkflowExecutionFailed, :ContinueAsNewWorkflowExecutionFailed, :TimerStarted, :TimerCanceled, :SignalExternalWorkflowExecutionInitiated, :RequestCancelExternalWorkflowExecutionInitiated, :RequestCancelExternalWorkflowExecutionFailed, :StartChildWorkflowExecutionInitiated, :CancelTimerFailed, :WorkflowExecutionStarted, :WorkflowExecutionSignaled]) activity_client_set = Set.new([:ActivityTaskCompleted, :ActivityTaskCanceled, :ActivityTaskTimedOut, :ScheduleActivityTaskFailed, :ActivityTaskFailed]) workflow_client_set = Set.new([:ExternalWorkflowExecutionCancelRequested, :ChildWorkflowExecutionCanceled, :ChildWorkflowExecutionCompleted, :ChildWorkflowExecutionFailed, :ChildWorkflowExecutionStarted, :ChildWorkflowExecutionTerminated, :ChildWorkflowExecutionTimedOut, :ExternalWorkflowExecutionSignaled, :SignalExternalWorkflowExecutionFailed, :StartChildWorkflowExecutionFailed]) event_set_to_object_mapping = { self_set => self, activity_client_set => @activity_client, workflow_client_set => @workflow_client } thing_to_operate_on = event_set_to_object_mapping.map {|key, value| value if key.member? event_type_symbol }.compact.first thing_to_operate_on.send(handle_event, event) # DecisionTaskStarted is taken care of at TODO end