module AWS::Flow::Core

Public Instance Methods

_error_handler(&block) click to toggle source

@api private

# File lib/aws/flow/implementation.rb, line 110
def _error_handler(&block)
  error_handler(&block).result
end
daemon_task(&block) click to toggle source

@param block

The block of code to be executed when the daemon task is run.

@return [Future]

The tasks result, which is a {Future}.

@raise [NoContextException]

If the current fiber does not respond to `Fiber.__context__`.
# File lib/aws/flow/implementation.rb, line 56
def daemon_task(&block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  t = DaemonTask.new(nil, &block)
  task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t)
  context << t
  t.result
end
error_handler(&block) click to toggle source

Creates a new error handler for asynchronous tasks.

@param block

A block that defines the {BeginRescueEnsure#begin}, {BeginRescueEnsure#rescue}, and {BeginRescueEnsure#ensure}
methods.

@return

The result of the `begin` statement if there is no error; otherwise the value of the `return` statement.

@raise [NoContextException]

If the current fiber does not respond to `Fiber.__context__`.
# File lib/aws/flow/implementation.rb, line 97
def error_handler(&block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  begin_rescue_ensure = BeginRescueEnsure.new(:parent => context.get_closest_containing_scope)
  bge = BeginRescueEnsureWrapper.new(block, begin_rescue_ensure)
  context << bge
  context << begin_rescue_ensure
  begin_rescue_ensure
end
external_task(&block) click to toggle source

@param block

The block of code to be executed when the external task is run.

@return [nil]

@raise [NoContextException]

If the current fiber does not respond to `Fiber.__context__`.
# File lib/aws/flow/implementation.rb, line 75
def external_task(&block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  t = ExternalTask.new(:parent => context.get_closest_containing_scope, &block)
  task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t)
  context << t
  nil
end
gate_by_version(version, method, &block) click to toggle source

@api private

# File lib/aws/flow/async_scope.rb, line 23
def gate_by_version(version, method, &block)
  if RUBY_VERSION.send(method, version)
    block.call
  end
end
make_backtrace(parent_backtrace) click to toggle source

@api private

# File lib/aws/flow/flow_utils.rb, line 33
def make_backtrace(parent_backtrace)
  # 1 frame for the function that actually removes the stack traces.
  # 1 frame for the function that calls into the function that removes
  # frames in AsyncBacktrace.
  # 1 frame for the call into this function.
  # 1 frame for the initialize call of the BeginRescueEnsure or ExternalTask.
  # 1 frame for the new call into the BeginRescueEnsure or ExternalTask.
  # 1 frame for the AsyncScope initialize that the BeginRescueEnsure/ExternalTask has to be in.

  # "./lib/aws/rubyflow/asyncBacktrace.rb:75:in `caller'"
  # "./lib/aws/rubyflow/asyncBacktrace.rb:21:in `create'"
  # "./lib/aws/rubyflow/flow.rb:16:in `make_backtrace'"
  # "./lib/aws/rubyflow/flow.rb:103:in `initialize'"
  # "./lib/aws/rubyflow/asyncScope.rb:17:in `new'"
  # "./lib/aws/rubyflow/asyncScope.rb:17:in `initialize'"

  frames_to_skip = 7
  backtrace = AsyncBacktrace.create(parent_backtrace, frames_to_skip)
end
task(future = nil, &block) click to toggle source

@param [Future] future

Unused; defaults to **nil**.

@param block

The block of code to be executed when the task is run.

@raise [NoContextException]

If the current fiber does not respond to `Fiber.__context__`.

@return [Future]

The tasks result, which is a {Future}.
# File lib/aws/flow/implementation.rb, line 36
def task(future = nil, &block)
  fiber = ::Fiber.current
  raise NoContextException unless fiber.respond_to? :__context__
  context = fiber.__context__
  t = Task.new(nil, &block)
  task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t)
  context << t
  t.result
end
timed_wait_for_all(timeout, *futures) click to toggle source

Blocks until all of the arguments are set or until timeout expires

@param [Array<Future>] futures

A list of futures to wait for. The function will return only when all of them are set.

@param [Integer] timeout

The timeout value after which it will raise a Timeout::Error

@return [Array<Future>]

A list of the set futures, in the order of being set.
# File lib/aws/flow/implementation.rb, line 180
def timed_wait_for_all(timeout, *futures)
  timed_wait_for_function(timeout, lambda {|result, future_list| result.size == future_list.size}, futures)
end
timed_wait_for_any(timeout, *futures) click to toggle source

Blocks until any of the arguments are set.

@param [Array<Future>] futures

A list of futures to wait for. The function will return when at least one of these is set.

@param [Integer] timeout

The timeout value after which it will raise a Timeout::Error

@return [Array<Future>]

A list of the set futures, in the order of being set.
# File lib/aws/flow/implementation.rb, line 165
def timed_wait_for_any(timeout, *futures)
  timed_wait_for_function(timeout, lambda {|result, future_list| result.length >= 1 }, futures)
end
timed_wait_for_function(timeout, function, *futures) click to toggle source

Waits for the passed-in function to complete, setting values for the provided futures when it does.

@param function

The function to wait for.

@param [Array<Future>] futures

A list of futures to provide values for when the function completes.

@param [Integer] timeout

The timeout value after which it will raise a Timeout::Error

@return [Array<Future>]

A list of the set futures, in the order of being set.
# File lib/aws/flow/implementation.rb, line 198
def timed_wait_for_function(timeout, function, *futures)
  wait_for_function_helper(timeout, function, *futures)
end
wait_for_all(*futures) click to toggle source

Blocks until all of the arguments are set.

@param [Array<Future>] futures

A list of futures to wait for. The function will return only when all of them are set.

@return [Array<Future>]

A list of the set futures, in the order of being set.
# File lib/aws/flow/implementation.rb, line 150
def wait_for_all(*futures)
  wait_for_function(lambda {|result, future_list| result.size == future_list.size}, futures)
end
wait_for_any(*futures) click to toggle source

Blocks until any of the arguments are set.

@param [Array<Future>] futures

A list of futures to wait for. The function will return when at least one of these is set.

@return [Array<Future>]

A list of the set futures, in the order of being set.
# File lib/aws/flow/implementation.rb, line 138
def wait_for_any(*futures)
  wait_for_function(lambda {|result, future_list| result.length >= 1 }, futures)
end
wait_for_function(function, *futures) click to toggle source

Waits for the passed-in function to complete, setting values for the provided futures when it does.

@param function

The function to wait for.

@param [Array<Future>] futures

A list of futures to provide values for when the function completes.

@return [Array<Future>]

A list of the set futures, in the order of being set.
# File lib/aws/flow/implementation.rb, line 126
def wait_for_function(function, *futures)
  wait_for_function_helper(nil, function, *futures)
end
wait_for_function_helper(timeout, function, *futures) click to toggle source

Helper method to refactor away the common implementation of wait_for_function and timed_wait_for_function.

# File lib/aws/flow/implementation.rb, line 204
def wait_for_function_helper(timeout, function, *futures)
  futures.flatten!

  f = futures.select { |x| x.is_a?(ExternalFuture) }

  if f.size > 0 && f.size != futures.size
    raise ArgumentError, "The futures array must contain either all "\
      "objects of Future or all objects of ExternalFuture"
  end

  conditional = f.size == 0 ? FiberConditionVariable.new :
    ExternalConditionVariable.new

  return nil if futures.empty?
  result = futures.select(&:set?)
  return futures.find(&:set?)if function.call(result, futures)
  futures.each do |f|
    f.on_set do |set_one|
      result << set_one
      conditional.broadcast if function.call(result, futures)
    end
  end

  if conditional.is_a?(FiberConditionVariable)
    conditional.wait
  else
    conditional.wait(timeout)
    raise Timeout::Error.new unless function.call(result, futures)
  end
  result
end