class AWS::Flow::AsyncDecider

Represents an asynchronous decider class.

Attributes

decision_helper[RW]
task_token[RW]

Public Class Methods

new(workflow_definition_factory, history_helper, decision_helper) click to toggle source

Creates a new asynchronous decider.

# File lib/aws/decider/async_decider.rb, line 193
def initialize(workflow_definition_factory, history_helper, decision_helper)
  @workflow_definition_factory = workflow_definition_factory
  @history_helper = history_helper
  @decision_helper = decision_helper
  @decision_task = history_helper.get_decision_task
  @workflow_clock = WorkflowClock.new(@decision_helper)

  @workflow_context = WorkflowContext.new(@decision_task, @workflow_clock)
  @activity_client = GenericActivityClient.new(@decision_helper, nil)
  @workflow_client = GenericWorkflowClient.new(@decision_helper, @workflow_context)
  @decision_context = DecisionContext.new(@activity_client, @workflow_client, @workflow_clock, @workflow_context, @decision_helper)
end

Public Instance Methods

complete_workflow() click to toggle source

Registers a ‘CompleteWorkflowExecution` decision.

@see docs.aws.amazon.com/amazonswf/latest/apireference/API_CompleteWorkflowExecutionDecisionAttributes.html

CompleteWorkflowExecutionDecisionAttributes
# File lib/aws/decider/async_decider.rb, line 326
def complete_workflow
  return unless @completed && ! @unhandled_decision
  decision_id = [:SELF, nil]
  if @failure
    @decision_helper[decision_id] = make_fail_decision(decision_id, @failure)
  elsif @cancel_requested
        @decision_helper[decision_id] = make_cancel_decision(decision_id)
  else

    if ! @workflow_context.continue_as_new_options.nil?
      @decision_helper[decision_id] = continue_as_new_workflow(decision_id, @workflow_context.continue_as_new_options)
    else
      if @result.nil?
        @decision_helper[decision_id] = make_completion_decision(decision_id, {
                                                                   :decision_type => "CompleteWorkflowExecution"})
      else
        @decision_helper[decision_id] = make_completion_decision(decision_id, {
                                                                   :decision_type => "CompleteWorkflowExecution",
                                                                   :complete_workflow_execution_decision_attributes => {:result => @result.get }})
      end
    end
  end
end
completed?() click to toggle source

Indicates whether the task completed.

@return [true, false]

Returns `true` if the task is completed; `false` otherwise.
# File lib/aws/decider/async_decider.rb, line 355
def completed?
  @completed
end
continue_as_new_workflow(decision_id, continue_as_new_options) click to toggle source

Continues this as a new workflow, using the provided decision and options.

@param [DecisionID] decision_id

The decision ID to use.

@param [WorkflowOptions] continue_as_new_options

The options to use for the new workflow.
# File lib/aws/decider/async_decider.rb, line 310
def continue_as_new_workflow(decision_id, continue_as_new_options)
  result = {
    :decision_type => "ContinueAsNewWorkflowExecution",
  }

  task_list = continue_as_new_options.task_list ? {:task_list => {:name => continue_as_new_options.task_list}} : {}
  to_add = continue_as_new_options.get_options([:execution_start_to_close_timeout, :task_start_to_close_timeout, :task_priority, :child_policy, :tag_list, :workflow_type_version, :input], task_list)
  result[:continue_as_new_workflow_execution_decision_attributes] = to_add
  CompleteWorkflowStateMachine.new(decision_id, result)
end
decide() click to toggle source

@api private

# File lib/aws/decider/async_decider.rb, line 224
def decide
  begin
    decide_impl
  rescue Exception => error
    raise error
  ensure
    begin
      @decision_helper.workflow_context_data = @definition.get_workflow_state
    rescue WorkflowException => error
      @decision_helper.workflow_context_data = error.details
    rescue Exception => error
      @decision_helper.workflow_context_data = error.message
      # Catch and do stuff
    ensure
      @workflow_definition_factory.delete_workflow_definition(@definition)
    end
  end
end
decide_impl() click to toggle source

@api private

# File lib/aws/decider/async_decider.rb, line 244
def decide_impl
  single_decision_event = @history_helper.get_single_decision_events
  while single_decision_event.length > 0
    @decision_helper.handle_decision_task_started_event
    [*single_decision_event].each do |event|
      last_non_replay_event_id = @history_helper.get_last_non_replay_event_id
      @workflow_clock.replaying = false if event.event_id >= last_non_replay_event_id
      @workflow_clock.replay_current_time_millis = @history_helper.get_replay_current_time_millis
      process_event(event)
      event_loop(event)
    end
    @task_token = @history_helper.get_decision_task.task_token
    complete_workflow if completed?
    single_decision_event = @history_helper.get_single_decision_events
  end
  if @unhandled_decision
    @unhandled_decision = false
    complete_workflow
  end
end
event_loop(event) click to toggle source

@api private

# File lib/aws/decider/async_decider.rb, line 649
def event_loop(event)
  return if @completed
  begin
    @completed = @workflow_async_scope.eventLoop
    #TODO Make this a cancellationException, set it up correctly?
  rescue Exception => e
    @failure = e unless @cancel_requested
    @completed = true
  end
end
get_decisions() click to toggle source

@note *Beware, this getter will modify things*, as it creates decisions for the objects in the {AsyncDecider}

that need decisions sent out.

@api private

# File lib/aws/decider/async_decider.rb, line 210
def get_decisions
  result = @decision_helper.decision_map.values.map {|decision_object|
    decision_object.get_decision}.compact
  if result.length > DecisionHelper.maximum_decisions_per_completion
    result = result.slice(0, DecisionHelper.maximum_decisions_per_completion - 1)
    result << ({:decision_type => "StartTimer", :start_timer_decision_attributes => {
                   :timer_id => DecisionHelper.force_immediate_decision_timer,
                      :start_to_fire_timeout => "0"
                    }})
  end
  return result
end
handle_activity_task_cancel_requested(event) click to toggle source

Handler for the ‘:ActivityTaskCancelRequested` event. @param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 435
def handle_activity_task_cancel_requested(event)
  activity_id = event.attributes[:activity_id]
  @decision_helper[activity_id].consume(:handle_cancellation_initiated_event)
end
handle_activity_task_scheduled(event) click to toggle source

Handler for the ‘:ActivityTaskScheduled` event.

@param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 364
def handle_activity_task_scheduled(event)
  activity_id = event.attributes[:activity_id]
  @decision_helper.activity_scheduling_event_id_to_activity_id[event.id] = activity_id
  @decision_helper[activity_id].consume(:handle_initiated_event)
  return @decision_helper[activity_id].done?
end
handle_cancel_timer_failed(event) click to toggle source

Handler for the ‘:CancelTimerFailed` event.

@param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 573
def handle_cancel_timer_failed(event)
  handle_event(event, {
                 :id_methods => [:timer_id],
                 :consume_symbol => :handle_cancellation_failure_event
               })
end
handle_cancel_workflow_execution_failed(event) click to toggle source

Handler for the ‘:CancelWorkflowExecutionFailed` event.

@param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 476
def handle_cancel_workflow_execution_failed(event)
  handle_closing_failure
end
handle_closing_failure() click to toggle source
# File lib/aws/decider/async_decider.rb, line 450
def handle_closing_failure
  @unhandled_decision = true
  @decision_helper[[:SELF, nil]].consume(:handle_initiation_failed_event)
end
handle_complete_workflow_execution_failed(event) click to toggle source

Handler for the ‘:CompleteWorkflowExecutionFailed` event. @param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 458
def handle_complete_workflow_execution_failed(event)
  handle_closing_failure
end
handle_continue_as_new_workflow_execution_failed(event) click to toggle source

Handler for the ‘:ContinueAsNewWorkflowExecutionFailed` event.

@param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 485
def handle_continue_as_new_workflow_execution_failed(event)
  handle_closing_failure
end
handle_fail_workflow_execution_failed(event) click to toggle source

Handler for the ‘:FailWorkflowExecutionFailed` event.

@param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 467
def handle_fail_workflow_execution_failed(event)
  handle_closing_failure
end
handle_request_cancel_activity_task_failed(event) click to toggle source

Handler for the ‘:RequestCancelActivityTaskFailed` event. @param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 443
def handle_request_cancel_activity_task_failed(event)
  handle_event(event, {
                 :id_methods => [:activity_id],
                 :consume_symbol => :handle_cancellation_failure_event
               })
end
handle_request_cancel_external_workflow_execution_failed(event) click to toggle source

Handler for the ‘:RequestCancelExternalWorkflowExecutionFailed` event.

@param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 549
def handle_request_cancel_external_workflow_execution_failed(event)
  handle_event(event, {
                 :id_methods => [:workflow_id],
                 :consume_symbol => :handle_cancellation_failure_event
               })
end
handle_request_cancel_external_workflow_execution_initiated(event) click to toggle source

Handler for the ‘:RequestCancelExternalWorkflowExecutionInitiated` event.

@param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 537
def handle_request_cancel_external_workflow_execution_initiated(event)
  handle_event(event, {
                 :id_methods => [:workflow_id],
                 :consume_symbol => :handle_cancellation_initiated_event
               })
end
handle_signal_external_workflow_execution_initiated(event) click to toggle source

Handler for the ‘:SignalExternalWorkflowExecutionInitiated` event.

@param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 525
def handle_signal_external_workflow_execution_initiated(event)
  signal_id = event.attributes[:control]
  @decision_helper.signal_initiated_event_to_signal_id[event.id] = signal_id
  @decision_helper[signal_id].consume(:handle_initiated_event)
  @decision_helper[signal_id].done?
end
handle_start_child_workflow_execution_initiated(event) click to toggle source

Handler for the ‘:StartChildWorkflowExecutionInitiated` event.

@param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 561
def handle_start_child_workflow_execution_initiated(event)
  workflow_id = event.attributes[:workflow_id]
  @decision_helper.child_initiated_event_id_to_workflow_id[event.id] = workflow_id
  @decision_helper[workflow_id].consume(:handle_initiated_event)
  @decision_helper[workflow_id].done?
end
handle_start_timer_failed(event) click to toggle source

Handler for the ‘:StartTimerFailed` event.

@param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 407
def handle_start_timer_failed(event)
  timer_id = event.attributes.timer_id
  return if timer_id == DecisionHelper.force_immediate_decision_timer
  handle_event(event, {
                 :id_methods => [:timer_id],
                 :consume_symbol => :handle_completion_event,
                 :decision_helper_scheduled => :scheduled_timers,
                 :handle_open_request => lambda do |event, open_request|
                   exception = StartTimerFailedException(event.id, timer_id, nil, event.attributes.cause)
                   open_request.completion_handle.fail(exception)
                 end
               })
  state_machine = @decision_helper[timer_id]


end
handle_timer_canceled(event) click to toggle source

Handler for the ‘:TimerCanceled` event.

@param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 506
def handle_timer_canceled(event)
  handle_event(event, {
                 :id_methods => [:timer_id],
                 :consume_symbol => :handle_cancellation_event,
                 :decision_helper_scheduled => :scheduled_timers,
                 :handle_open_request => lambda do |event, open_request|
                   if ! open_request.nil?
                     cancellation_exception = CancellationException.new("Cancelled from a Timer Cancelled event")
                     open_request.completion_handle.fail(cancellation_exception)
                   end
                 end
               })
end
handle_timer_fired(event) click to toggle source

Handler for the ‘:TimerFired` event.

@param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 390
def handle_timer_fired(event)
  timer_id = event.attributes[:timer_id]
  return if timer_id == DecisionHelper.force_immediate_decision_timer
  @decision_helper[timer_id].consume(:handle_completion_event)
  if @decision_helper[timer_id].done?
    open_request = @decision_helper.scheduled_timers.delete(timer_id)
    return if open_request.nil?
    open_request.blocking_promise.set(nil)
    open_request.completion_handle.complete
  end
end
handle_timer_started(event) click to toggle source

Handler for the ‘:TimerStarted` event.

@param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 494
def handle_timer_started(event)
  timer_id = event.attributes[:timer_id]
  return if timer_id == DecisionHelper.force_immediate_decision_timer
  @decision_helper[timer_id].consume(:handle_initiated_event)
  @decision_helper[timer_id].done?
end
handle_workflow_execution_cancel_requested(event) click to toggle source

Handler for the ‘:WorkflowExecutionCancelRequested` event. @param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 427
def handle_workflow_execution_cancel_requested(event)
  @workflow_async_scope.cancel(CancellationException.new("Cancelled from a WorkflowExecutionCancelRequested"))
  @cancel_requested = true
end
handle_workflow_execution_signaled(event) click to toggle source

Handler for the ‘WorkflowExecutionSignaled` event.

@param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 585
def handle_workflow_execution_signaled(event)
  signal_name = event.attributes[:signal_name]
  input = event.attributes[:input] if event.attributes.keys.include? :input
  input ||= NoInput.new
  # TODO do stuff if we are @completed
  t = Task.new(nil) do
    @definition.signal_received(signal_name, input)
  end
  task_context = TaskContext.new(:parent => @workflow_async_scope.get_closest_containing_scope, :task => t)
  @workflow_async_scope.get_closest_containing_scope << t
end
handle_workflow_execution_started(event) click to toggle source

Handler for the ‘:WorkflowExecutionStarted` event.

@param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 376
def handle_workflow_execution_started(event)
  @workflow_async_scope = AsyncScope.new do
    FlowFiber.current[:decision_context] = @decision_context
    input = (event.attributes.keys.include? :input) ?  event.attributes[:input] : nil
    @definition = @workflow_definition_factory.get_workflow_definition(@decision_context)
    @result = @definition.execute(input)
  end
end
make_cancel_decision(decision_id) click to toggle source
# File lib/aws/decider/async_decider.rb, line 299
def make_cancel_decision(decision_id)
  CompleteWorkflowStateMachine.new(decision_id, {:decision_type => "CancelWorkflowExecution"})
end
make_completion_decision(decision_id, decision) click to toggle source
# File lib/aws/decider/async_decider.rb, line 296
def make_completion_decision(decision_id, decision)
  CompleteWorkflowStateMachine.new(decision_id, decision)
end
make_fail_decision(decision_id, failure) click to toggle source

Registers a {FailWorkflowExecution} decision.

@param [DecisionID] decision_id

The ID of the {DecisionTaskCompleted} event corresponding to the decision task that resulted in the decision
failing in this execution. This information can be useful for tracing the sequence of events back from the
failure.

@param [Exception] failure

The exception that is associated with the failed workflow.

@see docs.aws.amazon.com/amazonswf/latest/apireference/API_FailWorkflowExecutionDecisionAttributes.html

FailWorkflowExecutionDecisionAttributes
# File lib/aws/decider/async_decider.rb, line 278
def make_fail_decision(decision_id, failure)
  decision_type = "FailWorkflowExecution"

  # Get the reason from the failure. Or get the message if a
  # CancellationException is initialized without a reason. Fall back to
  # a default string if nothing is provided
  reason = failure.reason || failure.message || "Workflow failure did not provide any reason."
  # Get the details from the failure. Or get the backtrace if a
  # CancellationException is initialized without a details. Fall back to
  # a default string if nothing is provided
  details = failure.details || failure.backtrace.to_s || "Workflow failure did not provide any details."

  fail_workflow_execution_decision_attributes = { reason: reason, details: details }
  decision = {:decision_type => decision_type, :fail_workflow_execution_decision_attributes => fail_workflow_execution_decision_attributes}
  CompleteWorkflowStateMachine.new(decision_id, decision)

end
process_event(event) click to toggle source

Processes decider events.

@param [Object] event

The event to process.
# File lib/aws/decider/async_decider.rb, line 602
def process_event(event)
  event_type_symbol = event.event_type.to_sym
  # Mangle the name so that it is handle_ + the name of the event type in snakecase
  handle_event = "handle_" + event.event_type.gsub(/(.)([A-Z])/,'\1_\2').downcase
  noop_set = Set.new([:DecisionTaskScheduled, :DecisionTaskCompleted,
  :DecisionTaskStarted, :DecisionTaskTimedOut, :WorkflowExecutionTimedOut,
  :WorkflowExecutionTerminated, :MarkerRecorded,
  :WorkflowExecutionCompleted, :WorkflowExecutionFailed,
  :WorkflowExecutionCanceled, :WorkflowExecutionContinuedAsNew, :ActivityTaskStarted])

  return if noop_set.member? event_type_symbol

  self_set = Set.new([:TimerFired, :StartTimerFailed,
  :WorkflowExecutionCancel, :ActivityTaskScheduled,
  :WorkflowExecutionCancelRequested,
  :ActivityTaskCancelRequested, :RequestCancelActivityTaskFailed,
  :CompleteWorkflowExecutionFailed, :FailWorkflowExecutionFailed,
  :CancelWorkflowExecutionFailed, :ContinueAsNewWorkflowExecutionFailed,
  :TimerStarted, :TimerCanceled,
  :SignalExternalWorkflowExecutionInitiated,
  :RequestCancelExternalWorkflowExecutionInitiated,
  :RequestCancelExternalWorkflowExecutionFailed,
  :StartChildWorkflowExecutionInitiated, :CancelTimerFailed, :WorkflowExecutionStarted, :WorkflowExecutionSignaled])

  activity_client_set = Set.new([:ActivityTaskCompleted,
  :ActivityTaskCanceled, :ActivityTaskTimedOut,
  :ScheduleActivityTaskFailed, :ActivityTaskFailed])

  workflow_client_set =
  Set.new([:ExternalWorkflowExecutionCancelRequested,
  :ChildWorkflowExecutionCanceled, :ChildWorkflowExecutionCompleted,
  :ChildWorkflowExecutionFailed,
  :ChildWorkflowExecutionStarted, :ChildWorkflowExecutionTerminated,
  :ChildWorkflowExecutionTimedOut, :ExternalWorkflowExecutionSignaled,
  :SignalExternalWorkflowExecutionFailed,
  :StartChildWorkflowExecutionFailed])

  event_set_to_object_mapping = { self_set => self,
    activity_client_set => @activity_client,
    workflow_client_set => @workflow_client }
  thing_to_operate_on = event_set_to_object_mapping.map {|key, value|
    value if key.member? event_type_symbol }.compact.first
  thing_to_operate_on.send(handle_event, event)
    # DecisionTaskStarted is taken care of at TODO
end