class AWS::Flow::WorkflowClient

Represents a client for a workflow execution.

@!attribute domain

The Amazon SWF domain used for this workflow.

@!attribute [Hash, WorkflowOptions] options

Workflow options for this client.

Attributes

domain[RW]
options[RW]

Public Class Methods

default_option_class() click to toggle source

@api private

# File lib/aws/decider/workflow_client.rb, line 166
def self.default_option_class; WorkflowOptions; end
new(service, domain, workflow_class, options) click to toggle source

Creates a new {WorkflowClient}.

@param service

The Amazon SWF [Client](http://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/Client.html) to use for
creating this {WorkflowClient}.

@param domain

The Amazon SWF [Domain](http://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/Domain.html) in which to
start the workflow execution.

@param workflow_class

@param [Hash, WorkflowOptions] options

Workflow options for this client.
Calls superclass method AWS::Flow::GenericClient::new
# File lib/aws/decider/workflow_client.rb, line 155
def initialize(service, domain, workflow_class, options)
  @service = service
  @domain = domain
  @workflow_class = workflow_class
  @options = options
  @failure_map = {}
  super
end

Public Instance Methods

events() click to toggle source

Gets the events for this workflow client.

# File lib/aws/decider/workflow_client.rb, line 170
def events
  @execution.events if @execution
end
get_decision_context() click to toggle source

Gets the current {DecisionContext} for the workflow client.

# File lib/aws/decider/workflow_client.rb, line 177
def get_decision_context
  @decision_context ||= FlowFiber.current[:decision_context]
  @decision_helper ||= @decision_context.decision_helper
end
is_execution_method(method_name) click to toggle source
# File lib/aws/decider/workflow_client.rb, line 383
def is_execution_method(method_name)
  (@workflow_class.workflows.map(&:options).map(&:execution_method).map(&:to_sym).include? method_name) || method_name == @workflow_class.entry_point
end
method_missing(method_name, *args, &block) click to toggle source
Calls superclass method
# File lib/aws/decider/workflow_client.rb, line 387
def method_missing(method_name, *args, &block)
  if is_execution_method(method_name)
    start_execution_method(method_name, *args, &block)
  else
    super(method_name, *args, &block)
  end
end
request_cancel_workflow_execution(future) click to toggle source
# File lib/aws/decider/workflow_client.rb, line 394
def request_cancel_workflow_execution(future)
  workflow_execution = future.workflow_execution
  run_id = workflow_execution.run_id.get
  workflow_id = workflow_execution.workflow_id.get
  state_machine = @decision_helper[workflow_id]
  state_machine.run_id = run_id
  state_machine.consume(:cancel)
end
signal_external_workflow(signal_name, workflow_execution, &block) click to toggle source

Called by {#signal_workflow_execution}. @api private

# File lib/aws/decider/workflow_client.rb, line 227
  def signal_external_workflow(signal_name, workflow_execution, &block)
  options = Utilities::interpret_block_for_options(SignalWorkflowOptions, block)
  options.signal_name ||= signal_name
  if workflow_execution
    options.domain ||= workflow_execution.domain.name.to_s
    options.workflow_id ||= workflow_execution.workflow_id.to_s
  end
  @service.signal_workflow_execution(options.get_full_options)
end
signal_internal_workflow(signal_name, workflow_execution, &block) click to toggle source

Called by {#signal_workflow_execution}. @api private

# File lib/aws/decider/workflow_client.rb, line 239
def signal_internal_workflow(signal_name, workflow_execution, &block)
  get_decision_context
  options = Utilities::interpret_block_for_options(SignalWorkflowOptions, block)
  # Unpack the workflow execution from the future
  workflow_execution = workflow_execution.workflow_execution if workflow_execution.respond_to? :workflow_execution
  options.signal_name ||= signal_name.to_s
  options.workflow_id ||= workflow_execution.workflow_id.get.to_s
  Utilities::merge_all_options(options)
  open_request = OpenRequestInfo.new
  decision_id = @decision_helper.get_next_id(:Signal)
  options.control ||= decision_id
  external_task do |external|
    external.initiate_task do |handle|
      @decision_helper[decision_id] = SignalDecisionStateMachine.new(decision_id, options)
      open_request.completion_handle = handle
      @decision_helper.scheduled_signals[decision_id] = open_request
    end
    external.cancellation_handler do |handle, cause|
      @decision_helper[decision_id].consume(:cancel)
      open_request = @decision_helper.scheduled_signal.delete(decision_id)
      raise "Signal #{decision_id} wasn't scheduled" unless open_request
      handle.complete
    end
  end
  return open_request.result
end
signal_workflow_execution(signal_name = nil, workflow_execution = nil, &block) click to toggle source

Records a ‘WorkflowExecutionSignaled` event in the workflow execution history and creates a decision task for the workflow execution. The event is recorded with the specified user-defined signal name and input (if provided).

@param signal_name

The user-defined name of the signal.

@param workflow_execution

The workflow execution to signal.

@param [Hash, SignalWorkflowOptions] block

A block of {SignalWorkflowOptions} for the `WorkflowExecutionSignaled` event.
# File lib/aws/decider/workflow_client.rb, line 216
def signal_workflow_execution(signal_name = nil, workflow_execution = nil, &block)
  if Utilities::is_external
    self.signal_external_workflow(signal_name, workflow_execution, &block)
  else
    self.signal_internal_workflow(signal_name, workflow_execution, &block)
  end
end
start_execution(*input, &block) click to toggle source

Begins executing this workflow.

@param input

Input to provide to the workflow.

@param [Hash, StartWorkflowOptions] block

A hash of {StartWorkflowOptions} to use for this workflow execution.
# File lib/aws/decider/workflow_client.rb, line 191
def start_execution(*input, &block)
  start_execution_method(nil, *input, &block)
end
start_execution_method(method_name, *input, &block) click to toggle source
# File lib/aws/decider/workflow_client.rb, line 195
def start_execution_method(method_name, *input, &block)
  if Utilities::is_external
    self.start_external_workflow(method_name, input, &block)
  else
    self.start_internal_workflow(method_name, input, &block)
  end
end
start_external_workflow(method_name, input = NoInput.new, &block) click to toggle source

Called by {#start_execution}. @api private

# File lib/aws/decider/workflow_client.rb, line 344
def start_external_workflow(method_name, input = NoInput.new, &block)
  options = Utilities::interpret_block_for_options(StartWorkflowOptions, block)
  client_options = Utilities::client_options_from_method_name(method_name, @options)
  options = Utilities::merge_all_options(client_options, options)

  @data_converter = options[:data_converter]
  # Basically, we want to avoid the special "NoInput, but allow stuff like nil in"
  if ! (input.class <= NoInput || input.empty?)
    options[:input] = @data_converter.dump input
  end
  if @workflow_class.nil?
    execution_method = @options.execution_method
    version = @options.version
  else
    workflow_type = method_name.nil? ? @workflow_class.workflows.first : @workflow_class.workflows.select { |x| x.options.execution_method.to_sym == method_name }.first
    execution_method = workflow_type.options.execution_method
    version = workflow_type.version
  end
  version = options[:version] ? options[:version] : version
  execution_method = options[:execution_method] ? options[:execution_method] : execution_method
  raise "Can't find an execution method for workflow #{workflow_class}" if execution_method.nil?
  # TODO A real workflowtype function
  workflow_name = @options.workflow_name || @options.prefix_name
  workflow_type_name = workflow_name.to_s + "." + execution_method.to_s

  task_list = options[:task_list]
  options[:task_list] = { :name => task_list } if options[:task_list]
  options[:workflow_id] ||= SecureRandom.uuid
  options[:domain] = @domain.name
  options[:workflow_type] = {
    :name => workflow_type_name.to_s,
    :version => version.to_s
  }
  [:prefix_name, :workflow_name, :version, :execution_method, :data_converter].each {|key| options.delete(key)}
  run_id = @service.start_workflow_execution(options)["runId"]
  this_workflow = @domain.workflow_executions.at(options[:workflow_id], run_id)
  this_workflow
end
start_internal_workflow(method_name, input = NoInput.new, &block) click to toggle source

Called by {#start_execution}. @api private

# File lib/aws/decider/workflow_client.rb, line 269
def start_internal_workflow(method_name, input = NoInput.new, &block)
  get_decision_context
  options = Utilities::interpret_block_for_options(StartWorkflowOptions, block)
  client_options = Utilities::client_options_from_method_name(method_name, @options)
  options = Utilities::merge_all_options(client_options, options)

  workflow_id_future, run_id_future = Future.new, Future.new
  minimal_domain = MinimalDomain.new(@domain.name.to_s)
  output = WorkflowFuture.new(AWS::Flow::MinimalWorkflowExecution.new(minimal_domain, workflow_id_future, run_id_future))
  new_options = StartWorkflowOptions.new(options)
  open_request = OpenRequestInfo.new
  workflow_id = new_options.workflow_id
  run_id = @decision_context.workflow_context.decision_task.workflow_execution.run_id
  workflow_id ||= @decision_helper.get_next_id(run_id.to_s + ":")
  workflow_id_future.set(workflow_id)
  error_handler do |t|
    t.begin do
      @data_converter = new_options.data_converter
      input = @data_converter.dump input unless input.empty?
      attributes = {}
      new_options.input ||= input unless input.empty?
      if @workflow_class != nil && new_options.execution_method.nil?
        new_options.execution_method = @workflow_class.entry_point
      end
      raise "Can't find an execution method for workflow #{@workflow_class}" if new_options.execution_method.nil?

      attributes[:options] = new_options
      attributes[:workflow_id] = workflow_id
      # TODO Use ChildWorkflowOptions
      attributes[:tag_list] = []

      external_task do |external|
        external.initiate_task do |handle|
          open_request.completion_handle = handle
          open_request.run_id = run_id_future
          open_request.description = output.workflow_execution
          @decision_helper.scheduled_external_workflows[workflow_id.to_s] = open_request
          @decision_helper[workflow_id.to_s] = ChildWorkflowDecisionStateMachine.new(workflow_id, attributes)
        end

        external.cancellation_handler do |handle, cause|
          state_machine = @decision_helper[workflow_id.to_s]
          if state_machine.current_state == :created
            open_request = @decision_helper.scheduled_external_workflows.delete(workflow_id)
            open_request.completion_handle.complete
          end
          state_machine.consume(:cancel)
        end
      end

      t.rescue(Exception) do |error|
        if error.is_a? ChildWorkflowFailedException
          details = @data_converter.load(error.details)
          error.details = details
          error.cause = details
        end
        @failure_map[workflow_id.to_s] = error
      end
      t.ensure do
        result = @data_converter.load open_request.result
        output.set(result)
        raise @failure_map[workflow_id.to_s] if @failure_map[workflow_id.to_s] && new_options.return_on_start
      end
    end
  end
  return output if new_options.return_on_start
  output.get
  this_failure = @failure_map[workflow_id.to_s]
  raise this_failure if this_failure
  return output.get
end