class AWS::Flow::WorkflowClient
Represents a client for a workflow execution.
@!attribute domain
The Amazon SWF domain used for this workflow.
@!attribute [Hash, WorkflowOptions] options
Workflow options for this client.
Attributes
Public Class Methods
@api private
# File lib/aws/decider/workflow_client.rb, line 166 def self.default_option_class; WorkflowOptions; end
Creates a new {WorkflowClient}.
@param service
The Amazon SWF [Client](http://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/Client.html) to use for creating this {WorkflowClient}.
@param domain
The Amazon SWF [Domain](http://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/Domain.html) in which to start the workflow execution.
@param workflow_class
@param [Hash, WorkflowOptions] options
Workflow options for this client.
AWS::Flow::GenericClient::new
# File lib/aws/decider/workflow_client.rb, line 155 def initialize(service, domain, workflow_class, options) @service = service @domain = domain @workflow_class = workflow_class @options = options @failure_map = {} super end
Public Instance Methods
Gets the events for this workflow client.
# File lib/aws/decider/workflow_client.rb, line 170 def events @execution.events if @execution end
Gets the current {DecisionContext} for the workflow client.
# File lib/aws/decider/workflow_client.rb, line 177 def get_decision_context @decision_context ||= FlowFiber.current[:decision_context] @decision_helper ||= @decision_context.decision_helper end
# File lib/aws/decider/workflow_client.rb, line 383 def is_execution_method(method_name) (@workflow_class.workflows.map(&:options).map(&:execution_method).map(&:to_sym).include? method_name) || method_name == @workflow_class.entry_point end
# File lib/aws/decider/workflow_client.rb, line 387 def method_missing(method_name, *args, &block) if is_execution_method(method_name) start_execution_method(method_name, *args, &block) else super(method_name, *args, &block) end end
# File lib/aws/decider/workflow_client.rb, line 394 def request_cancel_workflow_execution(future) workflow_execution = future.workflow_execution run_id = workflow_execution.run_id.get workflow_id = workflow_execution.workflow_id.get state_machine = @decision_helper[workflow_id] state_machine.run_id = run_id state_machine.consume(:cancel) end
Called by {#signal_workflow_execution}. @api private
# File lib/aws/decider/workflow_client.rb, line 227 def signal_external_workflow(signal_name, workflow_execution, &block) options = Utilities::interpret_block_for_options(SignalWorkflowOptions, block) options.signal_name ||= signal_name if workflow_execution options.domain ||= workflow_execution.domain.name.to_s options.workflow_id ||= workflow_execution.workflow_id.to_s end @service.signal_workflow_execution(options.get_full_options) end
Called by {#signal_workflow_execution}. @api private
# File lib/aws/decider/workflow_client.rb, line 239 def signal_internal_workflow(signal_name, workflow_execution, &block) get_decision_context options = Utilities::interpret_block_for_options(SignalWorkflowOptions, block) # Unpack the workflow execution from the future workflow_execution = workflow_execution.workflow_execution if workflow_execution.respond_to? :workflow_execution options.signal_name ||= signal_name.to_s options.workflow_id ||= workflow_execution.workflow_id.get.to_s Utilities::merge_all_options(options) open_request = OpenRequestInfo.new decision_id = @decision_helper.get_next_id(:Signal) options.control ||= decision_id external_task do |external| external.initiate_task do |handle| @decision_helper[decision_id] = SignalDecisionStateMachine.new(decision_id, options) open_request.completion_handle = handle @decision_helper.scheduled_signals[decision_id] = open_request end external.cancellation_handler do |handle, cause| @decision_helper[decision_id].consume(:cancel) open_request = @decision_helper.scheduled_signal.delete(decision_id) raise "Signal #{decision_id} wasn't scheduled" unless open_request handle.complete end end return open_request.result end
Records a ‘WorkflowExecutionSignaled` event in the workflow execution history and creates a decision task for the workflow execution. The event is recorded with the specified user-defined signal name and input (if provided).
@param signal_name
The user-defined name of the signal.
@param workflow_execution
The workflow execution to signal.
@param [Hash, SignalWorkflowOptions] block
A block of {SignalWorkflowOptions} for the `WorkflowExecutionSignaled` event.
# File lib/aws/decider/workflow_client.rb, line 216 def signal_workflow_execution(signal_name = nil, workflow_execution = nil, &block) if Utilities::is_external self.signal_external_workflow(signal_name, workflow_execution, &block) else self.signal_internal_workflow(signal_name, workflow_execution, &block) end end
Begins executing this workflow.
@param input
Input to provide to the workflow.
@param [Hash, StartWorkflowOptions] block
A hash of {StartWorkflowOptions} to use for this workflow execution.
# File lib/aws/decider/workflow_client.rb, line 191 def start_execution(*input, &block) start_execution_method(nil, *input, &block) end
# File lib/aws/decider/workflow_client.rb, line 195 def start_execution_method(method_name, *input, &block) if Utilities::is_external self.start_external_workflow(method_name, input, &block) else self.start_internal_workflow(method_name, input, &block) end end
Called by {#start_execution}. @api private
# File lib/aws/decider/workflow_client.rb, line 344 def start_external_workflow(method_name, input = NoInput.new, &block) options = Utilities::interpret_block_for_options(StartWorkflowOptions, block) client_options = Utilities::client_options_from_method_name(method_name, @options) options = Utilities::merge_all_options(client_options, options) @data_converter = options[:data_converter] # Basically, we want to avoid the special "NoInput, but allow stuff like nil in" if ! (input.class <= NoInput || input.empty?) options[:input] = @data_converter.dump input end if @workflow_class.nil? execution_method = @options.execution_method version = @options.version else workflow_type = method_name.nil? ? @workflow_class.workflows.first : @workflow_class.workflows.select { |x| x.options.execution_method.to_sym == method_name }.first execution_method = workflow_type.options.execution_method version = workflow_type.version end version = options[:version] ? options[:version] : version execution_method = options[:execution_method] ? options[:execution_method] : execution_method raise "Can't find an execution method for workflow #{workflow_class}" if execution_method.nil? # TODO A real workflowtype function workflow_name = @options.workflow_name || @options.prefix_name workflow_type_name = workflow_name.to_s + "." + execution_method.to_s task_list = options[:task_list] options[:task_list] = { :name => task_list } if options[:task_list] options[:workflow_id] ||= SecureRandom.uuid options[:domain] = @domain.name options[:workflow_type] = { :name => workflow_type_name.to_s, :version => version.to_s } [:prefix_name, :workflow_name, :version, :execution_method, :data_converter].each {|key| options.delete(key)} run_id = @service.start_workflow_execution(options)["runId"] this_workflow = @domain.workflow_executions.at(options[:workflow_id], run_id) this_workflow end
Called by {#start_execution}. @api private
# File lib/aws/decider/workflow_client.rb, line 269 def start_internal_workflow(method_name, input = NoInput.new, &block) get_decision_context options = Utilities::interpret_block_for_options(StartWorkflowOptions, block) client_options = Utilities::client_options_from_method_name(method_name, @options) options = Utilities::merge_all_options(client_options, options) workflow_id_future, run_id_future = Future.new, Future.new minimal_domain = MinimalDomain.new(@domain.name.to_s) output = WorkflowFuture.new(AWS::Flow::MinimalWorkflowExecution.new(minimal_domain, workflow_id_future, run_id_future)) new_options = StartWorkflowOptions.new(options) open_request = OpenRequestInfo.new workflow_id = new_options.workflow_id run_id = @decision_context.workflow_context.decision_task.workflow_execution.run_id workflow_id ||= @decision_helper.get_next_id(run_id.to_s + ":") workflow_id_future.set(workflow_id) error_handler do |t| t.begin do @data_converter = new_options.data_converter input = @data_converter.dump input unless input.empty? attributes = {} new_options.input ||= input unless input.empty? if @workflow_class != nil && new_options.execution_method.nil? new_options.execution_method = @workflow_class.entry_point end raise "Can't find an execution method for workflow #{@workflow_class}" if new_options.execution_method.nil? attributes[:options] = new_options attributes[:workflow_id] = workflow_id # TODO Use ChildWorkflowOptions attributes[:tag_list] = [] external_task do |external| external.initiate_task do |handle| open_request.completion_handle = handle open_request.run_id = run_id_future open_request.description = output.workflow_execution @decision_helper.scheduled_external_workflows[workflow_id.to_s] = open_request @decision_helper[workflow_id.to_s] = ChildWorkflowDecisionStateMachine.new(workflow_id, attributes) end external.cancellation_handler do |handle, cause| state_machine = @decision_helper[workflow_id.to_s] if state_machine.current_state == :created open_request = @decision_helper.scheduled_external_workflows.delete(workflow_id) open_request.completion_handle.complete end state_machine.consume(:cancel) end end t.rescue(Exception) do |error| if error.is_a? ChildWorkflowFailedException details = @data_converter.load(error.details) error.details = details error.cause = details end @failure_map[workflow_id.to_s] = error end t.ensure do result = @data_converter.load open_request.result output.set(result) raise @failure_map[workflow_id.to_s] if @failure_map[workflow_id.to_s] && new_options.return_on_start end end end return output if new_options.return_on_start output.get this_failure = @failure_map[workflow_id.to_s] raise this_failure if this_failure return output.get end