module AWS::Flow
Public Class Methods
# File lib/aws/decider/implementation.rb, line 62 def decision_context FlowFiber.current[:decision_context] end
# File lib/aws/decider/utilities.rb, line 21 def self.on_windows? require 'rbconfig' (RbConfig::CONFIG['host_os'] =~ /mswin|mingw/).nil? == false end
Starts an Activity
or a Workflow Template execution using the default workflow class FlowDefaultWorkflowRuby
@param [String or AWS::Flow::Templates::TemplateBase
] name_or_klass
The Activity or the Workflow Template that needs to be scheduled via the default workflow. This argument can either be a string that represents a fully qualified activity name - <ActivityClass>.<method_name> or it can be an instance of AWS::Flow::Templates::TemplateBase
@param [Hash] input
Input hash for the workflow execution
@param [Hash] opts
Additional options to configure the workflow or activity execution.
@option opts [true, false] :get_result
*Optional* This boolean flag can be set to true if the result future if required. The future can be waited on by using the AWS::Flow::wait_for_all, AWS::Flow::wait_for_any methods or by calling the ExternalFuture#get method. Default value is false.
@option opts [Hash] :exponential_retry
A hash of {AWS::Flow::ExponentialRetryOptions}. Default value is - { maximum_attempts: 3 }
@option opts [String] Optional :domain
Default value is FlowDefault
@option opts [Integer] Optional :execution_start_to_close_timeout
Default value is 3600 seconds (1 hour)
@option opts [String] Optional :workflow_id
@option opts [Integer] Optional :task_priority
Default value is 0
@option opts [String] Optional :tag_list
By default, the name of the activity task gets added to the workflow's tag_list
@option opts Optional :data_converter
Default value is {AWS::Flow::YAMLDataConverter}. To use the {AWS::Flow::S3DataConverter}, set the AWS_SWF_BUCKET_NAME environment variable name with a valid AWS S3 bucket name.
@option opts Optional A hash of {AWS::Flow::ActivityOptions}
Usage -
AWS::Flow::start("<ActivityClassName>.<method_name>", <input_hash>, <options_hash> )
Example -
1) Start an activity execution -
AWS::Flow::start("HelloWorldActivity.say_hello", { name: "World" })
2) Start an activity execution with overriden options -
AWS::Flow::start("HelloWorldActivity.say_hello", { name: "World" }, { exponential_retry: { maximum_attempts: 10 } } )
# File lib/aws/decider/starter.rb, line 200 def self.start(name_or_klass, input, options = {}) AWS::Flow::Templates::Starter.start(name_or_klass, input, options) end
Utility method used to start a workflow execution with the service.
@param [String or Class (that extends AWS::Flow::Workflows
)] workflow
Represents an AWS Flow Framework workflow class. If not provided, details of the workflow must be passed via the opts Hash.
@param [Hash] input
Input hash for the workflow execution
@param [Hash] opts
Hash of options to configure the workflow execution
@option opts [String] Required :domain
@option opts [String] Required :version
@option opts [String] Optional :prefix_name
Must be specified if workflow is not passed in as an argument
@option opts [String] Optional :execution_method
Must be specified if workflow is not passed in as an argument
@option opts [String] Optional :from_class
@option opts [String] Optional :workflow_id
@option opts [Integer] Optional :execution_start_to_close_timeout
@option opts [Integer] Optional :task_start_to_close_timeout
@option opts [Integer] Optional :task_priority
@option opts [String] Optional :task_list
@option opts [String] Optional :child_policy
@option opts [Array] Optional :tag_list
@option opts Optional :data_converter
Usage -
1) Passing a fully qualified workflow <prefix_name>.<execution_method> name -
AWS::Flow::start_workflow("HelloWorkflow.say_hello", "world", { domain: "FooDomain", version: "1.0" ... })
2) Passing workflow class name with other details in the options hash -
AWS::Flow::start_workflow("HelloWorkflow", "world", { domain: "FooDomain", execution_method: "say_hello", version: "1.0" ... })
3) Acquiring options using the :from_class option -
AWS::Flow::start_workflow(nil, "hello", { domain: "FooDomain", from_class: "HelloWorkflow" }) # This will take all the required options from the HelloWorkflow class. # If execution_method options is not passed in, it will use the first # workflow method in the class.
4) All workflow options are present in the options hash. This is the case
when this method is called by AWS::Flow#start AWS::Flow::start_workflow(nil, "hello", { domain: "FooDomain", prefix_name: "HelloWorkflow", execution_method: "say_hello", version: "1.0", ... })
# File lib/aws/decider/starter.rb, line 84 def self.start_workflow(workflow = nil, input, opts) raise ArgumentError, "Please provide an options hash" if opts.nil? || !opts.is_a?(Hash) options = opts.dup # Get the domain out of the options hash. domain = options.delete(:domain) raise ArgumentError, "You must provide a :domain in the options hash" if domain.nil? if options[:from_class] # Do nothing. Use options as they are. They will be taken care of in the # workflow client elsif workflow.nil? # This block is usually executed when #start_workflow is called from # #start. All options required to start the workflow must be present # in the options hash. prefix_name = options[:prefix_name] || options[:workflow_name] # Check if required options are present raise ArgumentError, "You must provide a :prefix_name in the options hash" unless prefix_name raise ArgumentError, "You must provide an :execution_method in the options hash" unless options[:execution_method] raise ArgumentError, "You must provide a :version in the options hash" unless options[:version] else # When a workflow class name is given along with some options # If a fully qualified workflow name is given, split it into prefix_name # and execution_method prefix_name, execution_method = workflow.to_s.split(".") # If a fully qualified name is not given, then look for it in the options # hash execution_method ||= options[:execution_method] # Make sure all required options are present raise ArgumentError, "You must provide an :execution_method in the options hash" unless execution_method raise ArgumentError, "You must provide a :version in the options hash" unless options[:version] # Set the :prefix_name and :execution_method options correctly options.merge!( prefix_name: prefix_name, execution_method: execution_method, ) end swf = AWS::SimpleWorkflow.new domain = swf.domains[domain] # Get a workflow client for the domain client = workflow_client(domain.client, domain) { options } # Start the workflow execution client.start_execution(input) end
# File lib/aws/decider/version.rb, line 18 def self.version "3.1.0" end
Execute a block with retries within a workflow context.
@param options
The {RetryOptions} to use.
@param block
The block to execute.
# File lib/aws/decider/implementation.rb, line 52 def with_retry(options = {}, &block) # TODO raise a specific error instead of a runtime error raise "with_retry can only be used inside a workflow context!" if Utilities::is_external retry_options = ExponentialRetryOptions.new(options) retry_policy = RetryPolicy.new(retry_options.retry_function, retry_options) async_retrying_executor = AsyncRetryingExecutor.new(retry_policy, self.decision_context.workflow_clock, retry_options.return_on_start) future = async_retrying_executor.execute(lambda { block.call }) Utilities::drill_on_future(future) unless retry_options.return_on_start end
@api private
# File lib/aws/decider/implementation.rb, line 71 def self.workflow_client(service = nil, domain = nil, &block) options = Utilities::interpret_block_for_options(StartWorkflowOptions, block) if ! Utilities::is_external service = AWS::SimpleWorkflow.new # So, we probably shouldn't be doing this, but we need to slightly # redesign where this is available from. domain = FlowFiber.current[:decision_context].workflow_context.decision_task.workflow_execution.domain else if service.nil? || domain.nil? raise "You must provide both a service and domain when using workflow client in an external setting" end end workflow_class_name = options.from_class || options.workflow_name workflow_class = get_const(workflow_class_name) rescue nil WorkflowClient.new(service, domain, workflow_class, options) end
Public Instance Methods
Creates a new {WorkflowClient} instance.
@param service
An {http://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow.html Amazon SWF service} reference. This is usually created with: swf = AWS::SimpleWorkflow.new
@param domain
The Amazon SWF {http://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/Domain.html Domain} to use for this workflow client. This is usually created on the service object, such as: domain = swf.domains.create('my-domain', 10) or retrieved from it (for existing domains): domain = swf.domains['my-domain']
@param [Hash, StartWorkflowOptions] block
A hash of options to start the workflow.
# File lib/aws/decider/implementation.rb, line 40 def workflow_client(service = nil, domain = nil, &block) AWS::Flow.send(:workflow_client, service, domain, &block) end
This method is for internal use only and may be changed or removed without prior notice. Use {#workflow_client} instead.
@api private
# File lib/aws/decider/workflow_enabled.rb, line 21 def workflow_factory(client, domain, &options) WorkflowFactory.new(client, domain, options) end
Private Instance Methods
# File lib/aws/decider/implementation.rb, line 62 def decision_context FlowFiber.current[:decision_context] end
Execute a block with retries within a workflow context.
@param options
The {RetryOptions} to use.
@param block
The block to execute.
# File lib/aws/decider/implementation.rb, line 52 def with_retry(options = {}, &block) # TODO raise a specific error instead of a runtime error raise "with_retry can only be used inside a workflow context!" if Utilities::is_external retry_options = ExponentialRetryOptions.new(options) retry_policy = RetryPolicy.new(retry_options.retry_function, retry_options) async_retrying_executor = AsyncRetryingExecutor.new(retry_policy, self.decision_context.workflow_clock, retry_options.return_on_start) future = async_retrying_executor.execute(lambda { block.call }) Utilities::drill_on_future(future) unless retry_options.return_on_start end