class AWS::Flow::WorkflowTaskPoller

Public Class Methods

new(service, domain, handler, task_list, options=nil) click to toggle source

Creates a new ‘WorkflowTaskPoller`.

@param service

The Amazon SWF service object on which this task poller will operate.

@param [String] domain

The name of the domain containing the task lists to poll.

@param [DecisionTaskHandler] handler

A {DecisionTaskHandler} to handle polled tasks. The poller will call the {DecisionTaskHandler#handle_decision_task} method.

@param [Array] task_list

Specifies the task list to poll for decision tasks.

@param [Object] options

Options to use for the logger.
# File lib/aws/decider/task_poller.rb, line 40
def initialize(service, domain, handler, task_list, options=nil)
  @service = service
  @handler = handler
  @domain = domain
  @task_list = task_list
  @logger = options.logger if options
  @logger ||= Utilities::LogFactory.make_logger(self)
end

Public Instance Methods

get_decision_task() click to toggle source

@api private Retrieves any decision tasks that are ready.

# File lib/aws/decider/task_poller.rb, line 51
def get_decision_task
  @domain.decision_tasks.poll_for_single_task(@task_list)
end
poll_and_process_single_task() click to toggle source
# File lib/aws/decider/task_poller.rb, line 55
def poll_and_process_single_task
  # TODO waitIfSuspended
  begin
    @logger.debug "Polling for a new decision task of type #{@handler.workflow_definition_map.keys.map{ |x| "#{x.name} #{x.version}"} } on task_list: #{@task_list}"
    task = get_decision_task
    if task.nil?
      @logger.debug "Didn't get a task on task_list: #{@task_list}"
      return false
    end
    @logger.info Utilities.workflow_task_to_debug_string("Got decision task", task, @task_list)

    task_completed_request = @handler.handle_decision_task(task)
    @logger.debug "Response to the task will be #{task_completed_request.inspect}"

    if !task_completed_request[:decisions].empty? && (task_completed_request[:decisions].first.keys.include?(:fail_workflow_execution_decision_attributes))
      fail_hash = task_completed_request[:decisions].first[:fail_workflow_execution_decision_attributes]
      reason = fail_hash[:reason]
      details = fail_hash[:details]
    end

    begin
      @service.respond_decision_task_completed(task_completed_request)
    rescue AWS::SimpleWorkflow::Errors::ValidationException => e
      if e.message.include? "failed to satisfy constraint: Member must have length less than or equal to"
        # We want to ensure that the WorkflowWorker doesn't just sit around and
        # time the workflow out. If there is a validation failure possibly
        # because of large inputs to child workflows/activities or large custom
        # exceptions we should fail the workflow with some minimal details.
        task_completed_request[:decisions] = [
          {
            decision_type: "FailWorkflowExecution",
            fail_workflow_execution_decision_attributes: {
              reason: Utilities.validation_error_string("Workflow"),
              details: "AWS::SimpleWorkflow::Errors::ValidationException"
            }
          }
        ]
        @service.respond_decision_task_completed(task_completed_request)
      end
      @logger.error "#{task.workflow_type.inspect} failed with exception: #{e.inspect}"
    end
    @logger.info Utilities.workflow_task_to_debug_string("Finished executing task", task, @task_list)
  rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault => e
    @logger.error "Error in the poller, #{e.inspect}"
  rescue Exception => e
    @logger.error "Error in the poller, #{e.inspect}"
  end
end