class AWS::Flow::WorkflowWorker
This worker class is intended for use by the workflow implementation. It is configured with a task list and a workflow implementation. The worker class polls for decision tasks in the specified task list. When a decision task is received, it creates an instance of the workflow implementation and calls the @ execute() decorated method to process the task.
Attributes
The workflow type for this workflow worker.
Public Class Methods
Creates a new WorkflowWorker
instance.
@param service
The service used with this workflow worker.
@param [String] domain
The Amazon SWF domain to operate on.
@param [Array] task_list
The default task list to put all of the decision requests.
@param args
The decisions to use.
AWS::Flow::GenericWorker::new
# File lib/aws/decider/worker.rb, line 130 def initialize(service, domain, task_list, *args, &block) @workflow_definition_map = {} @workflow_type_options = [] @options = Utilities::interpret_block_for_options(WorkerOptions, block) @logger = @options.logger if @options @logger ||= Utilities::LogFactory.make_logger(self) @options.logger ||= @logger if @options super(service, domain, task_list, *args) end
Public Instance Methods
# File lib/aws/decider/worker.rb, line 146 def add_implementation(workflow_class) add_workflow_implementation(workflow_class) end
Called by {#add_implementation}. @api private
# File lib/aws/decider/worker.rb, line 152 def add_workflow_implementation(workflow_class) workflow_class.workflows.delete_if do |workflow_type| workflow_type.version.nil? || workflow_type.name.nil? end @workflow_definition_map.merge!( WorkflowDefinitionFactory.generate_definition_map(workflow_class) ) workflow_class.workflows.each do |workflow_type| # TODO should probably do something like # GenericWorkflowWorker#registerWorkflowTypes options = workflow_type.options workflow_hash = options.get_options( [ :default_task_start_to_close_timeout, :default_execution_start_to_close_timeout, :default_child_policy, :default_task_priority ], { :domain => @domain.name, :name => workflow_type.name, :version => workflow_type.version } ) if options.default_task_list workflow_hash.merge!( :default_task_list => {:name => resolve_default_task_list(options.default_task_list)} ) end @workflow_type_options << workflow_hash end end
Registers this workflow with Amazon SWF.
# File lib/aws/decider/worker.rb, line 189 def register @workflow_type_options.delete_if {|workflow_type_options| workflow_type_options[:version].nil?} @workflow_type_options.each do |workflow_type_options| begin @service.register_workflow_type(workflow_type_options) rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e @logger.warn "#{e.class} while trying to register workflow #{e.message} with options #{workflow_type_options}" # Purposefully eaten up, the alternative is to check first, and who # wants to do two trips when one will do? end end end
Starts the workflow and runs it once, with an optional {WorkflowTaskPoller}.
@param should_register (see start
)
@param poller
An optional {WorkflowTaskPoller} to use.
# File lib/aws/decider/worker.rb, line 241 def run_once(should_register = false, poller = nil) register if should_register poller = WorkflowTaskPoller.new( @service, @domain, DecisionTaskHandler.new(@workflow_definition_map, @options), @task_list, @options ) if poller.nil? Kernel.exit if @shutting_down poller.poll_and_process_single_task end
# File lib/aws/decider/worker.rb, line 142 def set_workflow_implementation_types(workflow_implementation_types) workflow_implementation_types.each {|type| add_workflow_implementation(type)} end
Starts the workflow with a {WorkflowTaskPoller}.
@param [true,false] should_register
Indicates whether the workflow needs to be registered with Amazon SWF first. If {#register} was already called for this workflow worker, specify `false`.
# File lib/aws/decider/worker.rb, line 210 def start(should_register = true) # TODO check to make sure that the correct properties are set # TODO Register the domain if not already registered # TODO register types to poll # TODO Set up throttler # TODO Set up a timeout on the throttler correctly, # TODO Make this a generic poller, go to the right kind correctly poller = WorkflowTaskPoller.new( @service, @domain, DecisionTaskHandler.new(@workflow_definition_map, @options), @task_list, @options ) register if should_register @logger.debug "Starting an infinite loop to poll and process workflow tasks." loop do run_once(false, poller) end end