module AWS::Flow

Public Class Methods

decision_context() click to toggle source
# File lib/aws/decider/implementation.rb, line 62
def decision_context
  FlowFiber.current[:decision_context]
end
on_windows?() click to toggle source
# File lib/aws/decider/utilities.rb, line 21
def self.on_windows?
  require 'rbconfig'
  (RbConfig::CONFIG['host_os'] =~ /mswin|mingw/).nil? == false
end
start(name_or_klass, input, options = {}) click to toggle source

Starts an Activity or a Workflow Template execution using the default workflow class FlowDefaultWorkflowRuby

@param [String or AWS::Flow::Templates::TemplateBase] name_or_klass

The Activity or the Workflow Template that needs to be scheduled via
the default workflow. This argument can either be a string that
represents a fully qualified activity name - <ActivityClass>.<method_name>
or it can be an instance of AWS::Flow::Templates::TemplateBase

@param [Hash] input

Input hash for the workflow execution

@param [Hash] opts

Additional options to configure the workflow or activity execution.

@option opts [true, false] :get_result

*Optional* This boolean flag can be set to true if the result future
if required. The future can be waited on by using the
AWS::Flow::wait_for_all, AWS::Flow::wait_for_any methods or by
calling the ExternalFuture#get method. Default value is false.

@option opts [Hash] :exponential_retry

A hash of {AWS::Flow::ExponentialRetryOptions}. Default value is -
{ maximum_attempts: 3 }

@option opts [String] Optional :domain

Default value is FlowDefault

@option opts [Integer] Optional :execution_start_to_close_timeout

Default value is 3600 seconds (1 hour)

@option opts [String] Optional :workflow_id

@option opts [Integer] Optional :task_priority

Default value is 0

@option opts [String] Optional :tag_list

By default, the name of the activity task gets added to the workflow's
tag_list

@option opts Optional :data_converter

Default value is {AWS::Flow::YAMLDataConverter}. To use the
{AWS::Flow::S3DataConverter}, set the AWS_SWF_BUCKET_NAME environment
variable name with a valid AWS S3 bucket name.

@option opts Optional A hash of {AWS::Flow::ActivityOptions}

Usage -

AWS::Flow::start("<ActivityClassName>.<method_name>", <input_hash>,
<options_hash> )

Example -

1) Start an activity execution -

AWS::Flow::start("HelloWorldActivity.say_hello", { name: "World" })

2) Start an activity execution with overriden options -

AWS::Flow::start("HelloWorldActivity.say_hello", { name: "World" }, {
  exponential_retry: { maximum_attempts: 10 } }
)
# File lib/aws/decider/starter.rb, line 200
def self.start(name_or_klass, input, options = {})
  AWS::Flow::Templates::Starter.start(name_or_klass, input, options)
end
start_workflow(workflow = nil, input, opts) click to toggle source

Utility method used to start a workflow execution with the service.

@param [String or Class (that extends AWS::Flow::Workflows)] workflow

Represents an AWS Flow Framework workflow class. If not provided,
details of the workflow must be passed via the opts Hash.

@param [Hash] input

Input hash for the workflow execution

@param [Hash] opts

Hash of options to configure the workflow execution

@option opts [String] Required :domain

@option opts [String] Required :version

@option opts [String] Optional :prefix_name

Must be specified if workflow is not passed in as an argument

@option opts [String] Optional :execution_method

Must be specified if workflow is not passed in as an argument

@option opts [String] Optional :from_class

@option opts [String] Optional :workflow_id

@option opts [Integer] Optional :execution_start_to_close_timeout

@option opts [Integer] Optional :task_start_to_close_timeout

@option opts [Integer] Optional :task_priority

@option opts [String] Optional :task_list

@option opts [String] Optional :child_policy

@option opts [Array] Optional :tag_list

@option opts Optional :data_converter

Usage -

1) Passing a fully qualified workflow <prefix_name>.<execution_method> name -

AWS::Flow::start_workflow("HelloWorkflow.say_hello", "world", {
  domain: "FooDomain",
  version: "1.0"
  ...
})

2) Passing workflow class name with other details in the options hash -

AWS::Flow::start_workflow("HelloWorkflow", "world", {
  domain: "FooDomain",
  execution_method: "say_hello",
  version: "1.0"
  ...
})

3) Acquiring options using the :from_class option -

AWS::Flow::start_workflow(nil, "hello", {
  domain: "FooDomain",
  from_class: "HelloWorkflow"
})

# This will take all the required options from the HelloWorkflow class.
# If execution_method options is not passed in, it will use the first
# workflow method in the class.

4) All workflow options are present in the options hash. This is the case

when this method is called by AWS::Flow#start

AWS::Flow::start_workflow(nil, "hello", {
  domain: "FooDomain",
  prefix_name: "HelloWorkflow",
  execution_method: "say_hello",
  version: "1.0",
  ...
})
# File lib/aws/decider/starter.rb, line 84
def self.start_workflow(workflow = nil, input, opts)

  raise ArgumentError, "Please provide an options hash" if opts.nil? || !opts.is_a?(Hash)

  options = opts.dup

  # Get the domain out of the options hash.
  domain = options.delete(:domain)

  raise ArgumentError, "You must provide a :domain in the options hash" if domain.nil?

  if options[:from_class]
    # Do nothing. Use options as they are. They will be taken care of in the
    # workflow client
  elsif workflow.nil?
    # This block is usually executed when #start_workflow is called from
    # #start. All options required to start the workflow must be present
    # in the options hash.
    prefix_name = options[:prefix_name] || options[:workflow_name]
    # Check if required options are present
    raise ArgumentError, "You must provide a :prefix_name in the options hash" unless prefix_name
    raise ArgumentError, "You must provide an :execution_method in the options hash" unless options[:execution_method]
    raise ArgumentError, "You must provide a :version in the options hash" unless options[:version]
  else
    # When a workflow class name is given along with some options

    # If a fully qualified workflow name is given, split it into prefix_name
    # and execution_method
    prefix_name, execution_method = workflow.to_s.split(".")
    # If a fully qualified name is not given, then look for it in the options
    # hash
    execution_method ||= options[:execution_method]

    # Make sure all required options are present
    raise ArgumentError, "You must provide an :execution_method in the options hash" unless execution_method
    raise ArgumentError, "You must provide a :version in the options hash" unless options[:version]

    # Set the :prefix_name and :execution_method options correctly
    options.merge!(
      prefix_name: prefix_name,
      execution_method: execution_method,
    )
  end

  swf = AWS::SimpleWorkflow.new
  domain = swf.domains[domain]

  # Get a workflow client for the domain
  client = workflow_client(domain.client, domain) { options }

  # Start the workflow execution
  client.start_execution(input)
end
version() click to toggle source
# File lib/aws/decider/version.rb, line 18
def self.version
  "3.1.0"
end
with_retry(options = {}, &block) click to toggle source

Execute a block with retries within a workflow context.

@param options

The {RetryOptions} to use.

@param block

The block to execute.
# File lib/aws/decider/implementation.rb, line 52
def with_retry(options = {}, &block)
  # TODO raise a specific error instead of a runtime error
  raise "with_retry can only be used inside a workflow context!" if Utilities::is_external
  retry_options = ExponentialRetryOptions.new(options)
  retry_policy = RetryPolicy.new(retry_options.retry_function, retry_options)
  async_retrying_executor = AsyncRetryingExecutor.new(retry_policy, self.decision_context.workflow_clock, retry_options.return_on_start)
  future = async_retrying_executor.execute(lambda { block.call })
  Utilities::drill_on_future(future) unless retry_options.return_on_start
end
workflow_client(service = nil, domain = nil, &block) click to toggle source

@api private

# File lib/aws/decider/implementation.rb, line 71
def self.workflow_client(service = nil, domain = nil, &block)
  options = Utilities::interpret_block_for_options(StartWorkflowOptions, block)
  if ! Utilities::is_external
    service = AWS::SimpleWorkflow.new
    # So, we probably shouldn't be doing this, but we need to slightly
    # redesign where this is available from.
    domain = FlowFiber.current[:decision_context].workflow_context.decision_task.workflow_execution.domain
  else
    if service.nil? || domain.nil?
      raise "You must provide both a service and domain when using workflow client in an external setting"
    end
  end

  workflow_class_name = options.from_class || options.workflow_name
  workflow_class = get_const(workflow_class_name) rescue nil
  WorkflowClient.new(service, domain, workflow_class, options)
end

Public Instance Methods

workflow_client(service = nil, domain = nil, &block) click to toggle source

Creates a new {WorkflowClient} instance.

@param service

An {http://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow.html Amazon SWF service} reference. This is usually
created with:

    swf = AWS::SimpleWorkflow.new

@param domain

The Amazon SWF {http://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/Domain.html Domain} to use for this
workflow client. This is usually created on the service object, such as:

    domain = swf.domains.create('my-domain', 10)

or retrieved from it (for existing domains):

    domain = swf.domains['my-domain']

@param [Hash, StartWorkflowOptions] block

A hash of options to start the workflow.
# File lib/aws/decider/implementation.rb, line 40
def workflow_client(service = nil, domain = nil, &block)
  AWS::Flow.send(:workflow_client, service, domain, &block)
end
workflow_factory(client, domain, &options) click to toggle source
This method is for internal use only and may be changed or removed
 without prior notice.  Use {#workflow_client} instead.

@api private

# File lib/aws/decider/workflow_enabled.rb, line 21
def workflow_factory(client, domain, &options)
  WorkflowFactory.new(client, domain,  options)
end

Private Instance Methods

decision_context() click to toggle source
# File lib/aws/decider/implementation.rb, line 62
def decision_context
  FlowFiber.current[:decision_context]
end
with_retry(options = {}, &block) click to toggle source

Execute a block with retries within a workflow context.

@param options

The {RetryOptions} to use.

@param block

The block to execute.
# File lib/aws/decider/implementation.rb, line 52
def with_retry(options = {}, &block)
  # TODO raise a specific error instead of a runtime error
  raise "with_retry can only be used inside a workflow context!" if Utilities::is_external
  retry_options = ExponentialRetryOptions.new(options)
  retry_policy = RetryPolicy.new(retry_options.retry_function, retry_options)
  async_retrying_executor = AsyncRetryingExecutor.new(retry_policy, self.decision_context.workflow_clock, retry_options.return_on_start)
  future = async_retrying_executor.execute(lambda { block.call })
  Utilities::drill_on_future(future) unless retry_options.return_on_start
end