module AWS::Flow::Core
Public Instance Methods
@api private
# File lib/aws/flow/implementation.rb, line 110 def _error_handler(&block) error_handler(&block).result end
@param block
The block of code to be executed when the daemon task is run.
@return [Future]
The tasks result, which is a {Future}.
@raise [NoContextException]
If the current fiber does not respond to `Fiber.__context__`.
# File lib/aws/flow/implementation.rb, line 56 def daemon_task(&block) fiber = ::Fiber.current raise NoContextException unless fiber.respond_to? :__context__ context = fiber.__context__ t = DaemonTask.new(nil, &block) task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t) context << t t.result end
Creates a new error handler for asynchronous tasks.
@param block
A block that defines the {BeginRescueEnsure#begin}, {BeginRescueEnsure#rescue}, and {BeginRescueEnsure#ensure} methods.
@return
The result of the `begin` statement if there is no error; otherwise the value of the `return` statement.
@raise [NoContextException]
If the current fiber does not respond to `Fiber.__context__`.
# File lib/aws/flow/implementation.rb, line 97 def error_handler(&block) fiber = ::Fiber.current raise NoContextException unless fiber.respond_to? :__context__ context = fiber.__context__ begin_rescue_ensure = BeginRescueEnsure.new(:parent => context.get_closest_containing_scope) bge = BeginRescueEnsureWrapper.new(block, begin_rescue_ensure) context << bge context << begin_rescue_ensure begin_rescue_ensure end
@param block
The block of code to be executed when the external task is run.
@return [nil]
@raise [NoContextException]
If the current fiber does not respond to `Fiber.__context__`.
# File lib/aws/flow/implementation.rb, line 75 def external_task(&block) fiber = ::Fiber.current raise NoContextException unless fiber.respond_to? :__context__ context = fiber.__context__ t = ExternalTask.new(:parent => context.get_closest_containing_scope, &block) task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t) context << t nil end
@api private
# File lib/aws/flow/async_scope.rb, line 23 def gate_by_version(version, method, &block) if RUBY_VERSION.send(method, version) block.call end end
@api private
# File lib/aws/flow/flow_utils.rb, line 33 def make_backtrace(parent_backtrace) # 1 frame for the function that actually removes the stack traces. # 1 frame for the function that calls into the function that removes # frames in AsyncBacktrace. # 1 frame for the call into this function. # 1 frame for the initialize call of the BeginRescueEnsure or ExternalTask. # 1 frame for the new call into the BeginRescueEnsure or ExternalTask. # 1 frame for the AsyncScope initialize that the BeginRescueEnsure/ExternalTask has to be in. # "./lib/aws/rubyflow/asyncBacktrace.rb:75:in `caller'" # "./lib/aws/rubyflow/asyncBacktrace.rb:21:in `create'" # "./lib/aws/rubyflow/flow.rb:16:in `make_backtrace'" # "./lib/aws/rubyflow/flow.rb:103:in `initialize'" # "./lib/aws/rubyflow/asyncScope.rb:17:in `new'" # "./lib/aws/rubyflow/asyncScope.rb:17:in `initialize'" frames_to_skip = 7 backtrace = AsyncBacktrace.create(parent_backtrace, frames_to_skip) end
@param [Future] future
Unused; defaults to **nil**.
@param block
The block of code to be executed when the task is run.
@raise [NoContextException]
If the current fiber does not respond to `Fiber.__context__`.
@return [Future]
The tasks result, which is a {Future}.
# File lib/aws/flow/implementation.rb, line 36 def task(future = nil, &block) fiber = ::Fiber.current raise NoContextException unless fiber.respond_to? :__context__ context = fiber.__context__ t = Task.new(nil, &block) task_context = TaskContext.new(:parent => context.get_closest_containing_scope, :task => t) context << t t.result end
Blocks until all of the arguments are set or until timeout expires
@param [Array<Future>] futures
A list of futures to wait for. The function will return only when all of them are set.
@param [Integer] timeout
The timeout value after which it will raise a Timeout::Error
@return [Array<Future>]
A list of the set futures, in the order of being set.
# File lib/aws/flow/implementation.rb, line 180 def timed_wait_for_all(timeout, *futures) timed_wait_for_function(timeout, lambda {|result, future_list| result.size == future_list.size}, futures) end
Blocks until any of the arguments are set.
@param [Array<Future>] futures
A list of futures to wait for. The function will return when at least one of these is set.
@param [Integer] timeout
The timeout value after which it will raise a Timeout::Error
@return [Array<Future>]
A list of the set futures, in the order of being set.
# File lib/aws/flow/implementation.rb, line 165 def timed_wait_for_any(timeout, *futures) timed_wait_for_function(timeout, lambda {|result, future_list| result.length >= 1 }, futures) end
Waits for the passed-in function to complete, setting values for the provided futures when it does.
@param function
The function to wait for.
@param [Array<Future>] futures
A list of futures to provide values for when the function completes.
@param [Integer] timeout
The timeout value after which it will raise a Timeout::Error
@return [Array<Future>]
A list of the set futures, in the order of being set.
# File lib/aws/flow/implementation.rb, line 198 def timed_wait_for_function(timeout, function, *futures) wait_for_function_helper(timeout, function, *futures) end
Blocks until all of the arguments are set.
@param [Array<Future>] futures
A list of futures to wait for. The function will return only when all of them are set.
@return [Array<Future>]
A list of the set futures, in the order of being set.
# File lib/aws/flow/implementation.rb, line 150 def wait_for_all(*futures) wait_for_function(lambda {|result, future_list| result.size == future_list.size}, futures) end
Blocks until any of the arguments are set.
@param [Array<Future>] futures
A list of futures to wait for. The function will return when at least one of these is set.
@return [Array<Future>]
A list of the set futures, in the order of being set.
# File lib/aws/flow/implementation.rb, line 138 def wait_for_any(*futures) wait_for_function(lambda {|result, future_list| result.length >= 1 }, futures) end
Waits for the passed-in function to complete, setting values for the provided futures when it does.
@param function
The function to wait for.
@param [Array<Future>] futures
A list of futures to provide values for when the function completes.
@return [Array<Future>]
A list of the set futures, in the order of being set.
# File lib/aws/flow/implementation.rb, line 126 def wait_for_function(function, *futures) wait_for_function_helper(nil, function, *futures) end
Helper method to refactor away the common implementation of wait_for_function
and timed_wait_for_function.
# File lib/aws/flow/implementation.rb, line 204 def wait_for_function_helper(timeout, function, *futures) futures.flatten! f = futures.select { |x| x.is_a?(ExternalFuture) } if f.size > 0 && f.size != futures.size raise ArgumentError, "The futures array must contain either all "\ "objects of Future or all objects of ExternalFuture" end conditional = f.size == 0 ? FiberConditionVariable.new : ExternalConditionVariable.new return nil if futures.empty? result = futures.select(&:set?) return futures.find(&:set?)if function.call(result, futures) futures.each do |f| f.on_set do |set_one| result << set_one conditional.broadcast if function.call(result, futures) end end if conditional.is_a?(FiberConditionVariable) conditional.wait else conditional.wait(timeout) raise Timeout::Error.new unless function.call(result, futures) end result end