class AWS::Flow::ActivityTaskPoller

A poller for activity tasks.

Public Class Methods

new(service, domain, task_list, activity_definition_map, executor, options=nil) click to toggle source

Initializes a new ‘ActivityTaskPoller`.

@param service

*Required*. The AWS::SimpleWorkflow instance to use.

@param domain

*Required*. The domain used by the workflow.

@param task_list

*Required*. The task list used to poll for activity tasks.

@param activity_definition_map

*Required*. The {ActivityDefinition} instance that implements the
activity to run. This map is in the form:

    { :activity_type => 'activity_definition_name' }

The named activity definition will be run when the {#execute} method
is called.

@param options

*Optional*. Options to set for the activity poller. You can set the
following options:

* `logger` - The logger to use.
* `max_workers` - The maximum number of workers that can be running at
    once. The default is 20.
# File lib/aws/decider/task_poller.rb, line 137
def initialize(service, domain, task_list, activity_definition_map, executor, options=nil)
  @service = service
  @service_opts = @service.config.to_h
  @domain = domain
  @task_list = task_list
  @activity_definition_map = activity_definition_map
  @logger = options.logger if options
  @logger ||= Utilities::LogFactory.make_logger(self)
  @executor = executor
end

Public Instance Methods

execute(task) click to toggle source

Executes the specified activity task.

@param task

*Required*. The
[AWS::SimpleWorkflow::ActivityTask](http://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/ActivityTask.html)
object to run.
# File lib/aws/decider/task_poller.rb, line 155
def execute(task)
  activity_type = task.activity_type
  begin
    context = ActivityExecutionContext.new(@service, @domain, task)
    unless activity_implementation = @activity_definition_map[activity_type]
      raise "This activity worker was told to work on activity type "\
        "#{activity_type.inspect}, but this activity worker only knows "\
        "how to work on #{@activity_definition_map.keys.map(&:name).join' '}"
    end

    output, original_result, too_large = activity_implementation.execute(task.input, context)

     @logger.debug "Responding on task_token #{task.task_token.inspect}."
    if too_large
      @logger.error "#{task.activity_type.inspect} failed: "\
        "#{Utilities.validation_error_string_partial("Activity")} For "\
        "reference, the result was #{original_result}"

      respond_activity_task_failed_with_retry(
        task.task_token,
        Utilities.validation_error_string("Activity"),
        ""
      )
    elsif ! activity_implementation.execution_options.manual_completion
      @service.respond_activity_task_completed(
        :task_token => task.task_token,
        :result => output
      )
    end
  rescue ActivityFailureException => e
    @logger.error "#{task.activity_type.inspect} failed with exception: #{e.inspect}."
    respond_activity_task_failed_with_retry(
      task.task_token,
      e.message,
      e.details
    )
  end
end
poll_and_process_single_task(use_forking = true) click to toggle source

Polls the task list for a new activity task, and executes it if one is found.

If ‘use_forking` is set to `true` and the maximum number of workers (as set in {#initialize}) are already executing, this method will block until the number of running workers is less than the maximum.

@param use_forking

*Optional*. Whether to use forking to execute the task. On Windows,
you should set this to `false`.
# File lib/aws/decider/task_poller.rb, line 359
def poll_and_process_single_task(use_forking = true)
  @poll_semaphore ||= SuspendableSemaphore.new
  @poll_semaphore.acquire
  semaphore_needs_release = true
  begin
    if use_forking
      @executor.block_on_max_workers
    end
    @logger.debug "Polling for a new activity task of type #{@activity_definition_map.keys.map{ |x| "#{x.name} #{x.version}"} } on task_list: #{@task_list}"
    task = @domain.activity_tasks.poll_for_single_task(@task_list)
    if task
      @logger.info Utilities.activity_task_to_debug_string("Got activity task", task)
    end
  rescue Exception => e
    @logger.error "Error in the poller, #{e.inspect}"
    @poll_semaphore.release
    return false
  end
  if task.nil?
    @logger.debug "Didn't get a task on task_list: #{@task_list}"
    @poll_semaphore.release
    return false
  end
  semaphore_needs_release = false
  if use_forking
    @executor.execute { process_single_task(task) }
  else
    process_single_task(task)
  end
  @logger.info Utilities.activity_task_to_debug_string("Finished executing task", task)
  return true
end
process_single_task(task) click to toggle source

Processes the specified activity task.

@param task

*Required*. The
[AWS::SimpleWorkflow::ActivityTask](http://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/ActivityTask.html)
object to process.
# File lib/aws/decider/task_poller.rb, line 310
def process_single_task(task)

  # We are using the 'build' method to create a new ConnectionPool here to
  # make sure that connection pools are not shared among forked processes.
  # The default behavior of the ConnectionPool class is to cache a pool
  # for a set of options created by the 'new' method and always use the
  # same pool for the same set of options. This is undesirable when
  # multiple processes want to use different connection pools with same
  # options as is the case here.
  #
  # Since we can't change the pool of an already existing NetHttpHandler,
  # we also create a new NetHttpHandler in order to use the new pool.

  @service_opts[:connection_pool] = AWS::Core::Http::ConnectionPool.build(@service_opts[:http_handler].pool.options)
  @service_opts[:http_handler] = AWS::Core::Http::NetHttpHandler.new(@service_opts)
  @service = @service.with_options(@service_opts)

  begin
    begin
      execute(task)
    rescue CancellationException => e
      @logger.error "#{task.activity_type.inspect} failed with exception: #{e.inspect}"
      respond_activity_task_canceled_with_retry(task.task_token, e.message)
    rescue Exception => e
      @logger.error "#{task.activity_type.inspect} failed with exception: #{e.inspect}"
      respond_activity_task_failed_with_retry(task.task_token, e.message, e.backtrace)
    ensure
      @poll_semaphore.release
    end
  rescue Exception => e
    semaphore_needs_release = true
    @logger.error "Error in the poller, exception: #{e.inspect}. stacktrace: #{e.backtrace}"
    raise e
  ensure
    @poll_semaphore.release if semaphore_needs_release
  end
end
respond_activity_task_canceled(task_token, message) click to toggle source

Responds to the decider that the activity task should be canceled. No retry is attempted.

@param task_token

*Required*. The task token from the {ActivityDefinition} object to
retry.

@param message

*Required*. A message that provides detail about why the activity task
is cancelled.
# File lib/aws/decider/task_poller.rb, line 234
def respond_activity_task_canceled(task_token, message)

  begin
    @service.respond_activity_task_canceled(
      :task_token => task_token,
      :details => message
    )
  rescue AWS::SimpleWorkflow::Errors::ValidationException => e
    if e.message.include? "failed to satisfy constraint: Member must have length less than or equal to"
      # We want to ensure that the ActivityWorker doesn't just sit
      # around and time the activity out. If there is a validation failure
      # possibly because of large custom exceptions we should fail the
      # activity task with some minimal details
      respond_activity_task_failed_with_retry(
        task_token,
        Utilities.validation_error_string("Activity"),
        "AWS::SimpleWorkflow::Errors::ValidationException"
      )
    end
    @logger.error "respond_activity_task_canceled call failed with "\
      "exception: #{e.inspect}"
  end

end
respond_activity_task_canceled_with_retry(task_token, message) click to toggle source

Responds to the decider that the activity task should be canceled, and attempts to retry the task.

@note Retry behavior for this method is currently *not implemented*. For

now, it simply wraps {#respond_activity_task_canceled}.

@api private

# File lib/aws/decider/task_poller.rb, line 216
def respond_activity_task_canceled_with_retry(task_token, message)
  if @failure_retrier.nil?
    respond_activity_task_canceled(task_token, message)
  end
  #TODO Set up other stuff to do if we have it
end
respond_activity_task_failed(task_token, reason, details) click to toggle source

Responds to the decider that the activity task has failed. No retry is attempted.

@param task_token

*Required*. The task token from the {ActivityDefinition} object to
retry. The task token is generated by the service and should be
treated as an opaque value.

@param reason

*Required*. Description of the error that may assist in diagnostics.
Although this value is *required*, you can set it to an empty string
if you don't need this information.

@param details

*Required*. Detailed information about the failure. Although this
value is *required*, you can set it to an empty string if you don't
need this information.
# File lib/aws/decider/task_poller.rb, line 277
def respond_activity_task_failed(task_token, reason, details)
  @logger.debug "The task token to be reported on is #{task_token}"

  begin
    @service.respond_activity_task_failed(
      task_token: task_token,
      reason: reason.to_s,
      details: details.to_s
    )
  rescue AWS::SimpleWorkflow::Errors::ValidationException => e
    if e.message.include? "failed to satisfy constraint: Member must have length less than or equal to"
      # We want to ensure that the ActivityWorker doesn't just sit
      # around and time the activity out. If there is a validation failure
      # possibly because of large custom exceptions we should fail the
      # activity task with some minimal details
      respond_activity_task_failed_with_retry(
        task_token,
        Utilities.validation_error_string("Activity"),
        "AWS::SimpleWorkflow::Errors::ValidationException"
      )
    end
    @logger.error "respond_activity_task_failed call failed with "\
      "exception: #{e.inspect}"
  end
end
respond_activity_task_failed_with_retry(task_token, reason, details) click to toggle source

Responds to the decider that the activity task has failed, and attempts to retry the task.

@note Retry behavior for this method is currently *not implemented*. For

now, it simply wraps {#respond_activity_task_failed}.

@api private

# File lib/aws/decider/task_poller.rb, line 201
def respond_activity_task_failed_with_retry(task_token, reason, details)
  #TODO Set up this variable
  if @failure_retrier.nil?
    respond_activity_task_failed(task_token, reason, details)
    #TODO Set up other stuff to do if we have it
  end
end