class AWS::Flow::AsyncRetryingExecutor

internal AsyncRetryingExecutor class @api private

Public Class Methods

new(retrying_policy, clock, execution_id, return_on_start = false) click to toggle source

@api private

# File lib/aws/decider/async_retrying_executor.rb, line 23
def initialize(retrying_policy, clock, execution_id, return_on_start = false)
  @retrying_policy = retrying_policy
  @clock = clock
  @return_on_start = return_on_start
  @execution_id = execution_id
end

Public Instance Methods

execute(command, options = nil) click to toggle source

@api private

# File lib/aws/decider/async_retrying_executor.rb, line 31
def execute(command, options = nil)
  return schedule_with_retry(command, nil, Hash.new { |hash, key| hash[key] = 1 }, @clock.current_time, nil) if @return_on_start
  output = Utilities::AddressableFuture.new
  result_lock = Utilities::AddressableFuture.new
  error_handler do |t|
    t.begin do
      output.set(schedule_with_retry(command, nil, Hash.new { |hash, key| hash[key] = 1 }, @clock.current_time, nil))
    end
    t.rescue(Exception) do |error|
      @error_seen = error
    end
    t.ensure do
      output.set unless output.set?
      result_lock.set
    end
  end
  result_lock.get
  raise @error_seen if @error_seen
  output
end
invoke(command, attempts, first_attempt_time) click to toggle source

@api private

# File lib/aws/decider/async_retrying_executor.rb, line 70
def invoke(command, attempts, first_attempt_time)
  failure_to_retry = nil
  should_retry = Future.new
  return_value = Future.new
  output = Utilities::AddressableFuture.new
  error_handler do |t|
    t.begin { return_value.set(command.call) }
    t.rescue(Exception) do |error|
      failure_to_retry = error
      raise error if error.class <= CancellationException
    end
    t.ensure { should_retry.set(failure_to_retry) }
  end
  task do
    failure = should_retry.get
    if ! failure.nil?
      attempts[failure.class] += 1
      output.set(schedule_with_retry(command, failure, attempts, first_attempt_time, @clock.current_time))
    else
      output.set(return_value.get)
    end
    #to_return = return_value.set? ? return_value.get : nil
  end
  return output if @return_on_start
  output.get
end
schedule_with_retry(command, failure, attempts, first_attempt_time, time_of_recorded_failure) click to toggle source

@api private

# File lib/aws/decider/async_retrying_executor.rb, line 53
def schedule_with_retry(command, failure, attempts, first_attempt_time, time_of_recorded_failure)
  delay = -1
  if attempts.values.reduce(0, :+) > 1
    raise failure unless @retrying_policy.isRetryable(failure)
    delay = @retrying_policy.next_retry_delay_seconds(first_attempt_time, time_of_recorded_failure, attempts, failure, @execution_id)
    raise failure if delay < 0
  end
  if delay > 0
    task do
      @clock.create_timer(delay, lambda { invoke(command, attempts, first_attempt_time) })
    end
  else
    invoke(command, attempts, first_attempt_time)
  end
end