class AWS::Flow::ActivityWorker

Used to implement an activity worker. You can use the ‘ActivityWorker` class to conveniently poll a task list for activity tasks.

You configure the activity worker with activity implementation objects. This worker class then polls for activity tasks in the specified task list. When an activity task is received, it looks up the appropriate implementation that you provided, and calls the activity method to process the task. Unlike the {WorkflowWorker}, which creates a new instance for every decision task, the ‘ActivityWorker` simply uses the object you provided.

Public Class Methods

new(service, domain, task_list, *args, &block) click to toggle source

Creates a new ‘ActivityWorker` instance.

@param service

The Amazon SWF [Client](http://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/Client.html) used to register
this activity worker.

@param [String] domain

The Amazon SWF [Domain](http://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/Domain.html) to operate on.

@param [Array] task_list

The default task list to put all of the activity requests.

@param args

The activities to use.
Calls superclass method AWS::Flow::GenericWorker::new
# File lib/aws/decider/worker.rb, line 285
def initialize(service, domain, task_list, *args, &block)
  @activity_definition_map = {}
  @activity_type_options = []
  @options = Utilities::interpret_block_for_options(WorkerOptions, block)

  if @options
    @logger = @options.logger || Utilities::LogFactory.make_logger(self)
    @options.logger ||= @logger
    # Set the number of execution workers to 0 if it's not already set and
    # if the platform is Windows
    @options.execution_workers ||= 0 if AWS::Flow.on_windows?
    max_workers = @options.execution_workers
    # If max_workers is set to 0, then turn forking off
    @options.use_forking = false if (max_workers && max_workers.zero?)
  end
  max_workers = 20 if (max_workers.nil?)

  @executor = ForkingExecutor.new(
    :max_workers => max_workers,
    :logger => @logger
  )

  @shutdown_first_time_function = lambda do
    @executor.shutdown Float::INFINITY
    Kernel.exit
  end
  super(service, domain, task_list, *args)
end

Public Instance Methods

add_activities_implementation(class_or_instance) click to toggle source

Adds an activity implementation to this ‘ActivityWorker`.

@param [Activity] class_or_instance

The {Activity} class or instance to add.
# File lib/aws/decider/worker.rb, line 361
def add_activities_implementation(class_or_instance)
  klass = (class_or_instance.class == Class) ? class_or_instance : class_or_instance.class
  instance = (class_or_instance.class == Class) ? class_or_instance.new : class_or_instance
  klass.activities.each do |activity_type|

    # TODO this should assign to an activityImplementation, so that we can
    # call execute on it later
    @activity_definition_map[activity_type] = ActivityDefinition.new(
      instance,
      activity_type.name.split(".").last,
      nil,
      activity_type.options,
      activity_type.options.data_converter
    )
    options = activity_type.options
    option_hash = {
      :domain => @domain.name,
      :name => activity_type.name.to_s,
      :version => activity_type.version
    }

    option_hash.merge!(options.get_registration_options)

    if options.default_task_list
      option_hash.merge!(
        :default_task_list => {:name => resolve_default_task_list(options.default_task_list)}
      )
    end

    @activity_type_options << option_hash
  end
end
add_implementation(class_or_instance) click to toggle source

Adds an activity implementation to this ‘ActivityWorker`.

@param [Activity] class_or_instance

The {Activity} class or instance to add.
# File lib/aws/decider/worker.rb, line 319
def add_implementation(class_or_instance)
  add_activities_implementation(class_or_instance)
end
register() click to toggle source

Registers the activity type.

# File lib/aws/decider/worker.rb, line 325
def register
  @activity_type_options.each do |activity_type_options|
    begin
      @service.register_activity_type(activity_type_options)
    rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e
      @logger.warn "#{e.class} while trying to register activity #{e.message} with options #{activity_type_options}"
      previous_registration = @service.describe_activity_type(
        :domain => @domain.name,
        :activity_type => {
          :name => activity_type_options[:name],
          :version => activity_type_options[:version]
        }
      )
      default_options = activity_type_options.select { |key, val| key =~ /default/}
      previous_keys = previous_registration["configuration"].keys.map {|x| camel_case_to_snake_case(x).to_sym}

      previous_registration = Hash[previous_keys.zip(previous_registration["configuration"].values)]
      if previous_registration[:default_task_list]
        previous_registration[:default_task_list][:name] = previous_registration[:default_task_list].delete("name")
      end
      registration_difference =  default_options.sort.to_a - previous_registration.sort.to_a

      unless registration_difference.empty?
        raise "Activity [#{activity_type_options[:name]}]: There is a difference between the types you have registered previously and the types you are currently registering, but you haven't changed the version. These new changes will not be picked up. In particular, these options are different #{Hash[registration_difference]}"
      end
      # Purposefully eaten up, the alternative is to check first, and who
      # wants to do two trips when one will do?
    end
  end
end
run_once(should_register = true, poller = nil) click to toggle source

Starts the activity that was added to the ‘ActivityWorker` and, optionally, sets the {ActivityTaskPoller}.

@param [true, false] should_register

Set to `false` if the activity should not register itself (it is
already registered).

@param [ActivityTaskPoller] poller

The {ActivityTaskPoller} to use. If this is not set, a default
{ActivityTaskPoller} will be created.
# File lib/aws/decider/worker.rb, line 430
def run_once(should_register = true, poller = nil)
  register if should_register
  poller = ActivityTaskPoller.new(
    @service,
    @domain,
    @task_list,
    @activity_definition_map,
    @executor,
    @options
  ) if poller.nil?

  Kernel.exit if @shutting_down
  poller.poll_and_process_single_task(@options.use_forking)
end
start(should_register = true) click to toggle source

Starts the activity that was added to the ‘ActivityWorker`.

@param [true, false] should_register

Set to `false` if the activity should not register itself (it is
already registered).
# File lib/aws/decider/worker.rb, line 401
def start(should_register = true)

  register if should_register
  poller = ActivityTaskPoller.new(
    @service,
    @domain,
    @task_list,
    @activity_definition_map,
    @executor,
    @options
  )

  @logger.debug "Starting an infinite loop to poll and process activity tasks."
  loop do
    run_once(false, poller)
  end
end