class AWS::Flow::WorkflowWorker

This worker class is intended for use by the workflow implementation. It is configured with a task list and a workflow implementation. The worker class polls for decision tasks in the specified task list. When a decision task is received, it creates an instance of the workflow implementation and calls the @ execute() decorated method to process the task.

Attributes

workflow_type[RW]

The workflow type for this workflow worker.

Public Class Methods

new(service, domain, task_list, *args, &block) click to toggle source

Creates a new WorkflowWorker instance.

@param service

The service used with this workflow worker.

@param [String] domain

The Amazon SWF domain to operate on.

@param [Array] task_list

The default task list to put all of the decision requests.

@param args

The decisions to use.
Calls superclass method AWS::Flow::GenericWorker::new
# File lib/aws/decider/worker.rb, line 130
def initialize(service, domain, task_list, *args, &block)
  @workflow_definition_map = {}
  @workflow_type_options = []
  @options = Utilities::interpret_block_for_options(WorkerOptions, block)

  @logger = @options.logger if @options
  @logger ||= Utilities::LogFactory.make_logger(self)
  @options.logger ||= @logger if @options

  super(service, domain, task_list, *args)
end

Public Instance Methods

add_implementation(workflow_class) click to toggle source
# File lib/aws/decider/worker.rb, line 146
def add_implementation(workflow_class)
  add_workflow_implementation(workflow_class)
end
add_workflow_implementation(workflow_class) click to toggle source

Called by {#add_implementation}. @api private

# File lib/aws/decider/worker.rb, line 152
def add_workflow_implementation(workflow_class)
  workflow_class.workflows.delete_if do |workflow_type|
    workflow_type.version.nil? || workflow_type.name.nil?
  end

  @workflow_definition_map.merge!(
    WorkflowDefinitionFactory.generate_definition_map(workflow_class)
  )

  workflow_class.workflows.each do |workflow_type|
    # TODO should probably do something like
    # GenericWorkflowWorker#registerWorkflowTypes
    options = workflow_type.options
    workflow_hash = options.get_options(
      [
        :default_task_start_to_close_timeout,
        :default_execution_start_to_close_timeout,
        :default_child_policy,
        :default_task_priority
      ], {
        :domain => @domain.name,
        :name => workflow_type.name,
        :version => workflow_type.version
      }
    )

    if options.default_task_list
      workflow_hash.merge!(
        :default_task_list => {:name => resolve_default_task_list(options.default_task_list)}
      )
    end
    @workflow_type_options << workflow_hash
  end
end
register() click to toggle source

Registers this workflow with Amazon SWF.

# File lib/aws/decider/worker.rb, line 189
def register
  @workflow_type_options.delete_if {|workflow_type_options| workflow_type_options[:version].nil?}
  @workflow_type_options.each do |workflow_type_options|
    begin
      @service.register_workflow_type(workflow_type_options)
    rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e
      @logger.warn "#{e.class} while trying to register workflow #{e.message} with options #{workflow_type_options}"
      # Purposefully eaten up, the alternative is to check first, and who
      # wants to do two trips when one will do?
    end
  end
end
run_once(should_register = false, poller = nil) click to toggle source

Starts the workflow and runs it once, with an optional {WorkflowTaskPoller}.

@param should_register (see start)

@param poller

An optional {WorkflowTaskPoller} to use.
# File lib/aws/decider/worker.rb, line 241
def run_once(should_register = false, poller = nil)
  register if should_register

  poller = WorkflowTaskPoller.new(
    @service,
    @domain,
    DecisionTaskHandler.new(@workflow_definition_map, @options),
    @task_list,
    @options
  ) if poller.nil?

  Kernel.exit if @shutting_down
  poller.poll_and_process_single_task
end
set_workflow_implementation_types(workflow_implementation_types) click to toggle source
# File lib/aws/decider/worker.rb, line 142
def set_workflow_implementation_types(workflow_implementation_types)
  workflow_implementation_types.each {|type| add_workflow_implementation(type)}
end
start(should_register = true) click to toggle source

Starts the workflow with a {WorkflowTaskPoller}.

@param [true,false] should_register

Indicates whether the workflow needs to be registered with Amazon SWF
first. If {#register} was already called
for this workflow worker, specify `false`.
# File lib/aws/decider/worker.rb, line 210
def start(should_register = true)
  # TODO check to make sure that the correct properties are set
  # TODO Register the domain if not already registered
  # TODO register types to poll
  # TODO Set up throttler
  # TODO Set up a timeout on the throttler correctly,
  # TODO Make this a generic poller, go to the right kind correctly

  poller = WorkflowTaskPoller.new(
    @service,
    @domain,
    DecisionTaskHandler.new(@workflow_definition_map, @options),
    @task_list,
    @options
  )

  register if should_register
  @logger.debug "Starting an infinite loop to poll and process workflow tasks."
  loop do
    run_once(false, poller)
  end
end