class AWS::Flow::ActivityWorker
Used to implement an activity worker. You can use the ‘ActivityWorker` class to conveniently poll a task list for activity tasks.
You configure the activity worker with activity implementation objects. This worker class then polls for activity tasks in the specified task list. When an activity task is received, it looks up the appropriate implementation that you provided, and calls the activity method to process the task. Unlike the {WorkflowWorker}, which creates a new instance for every decision task, the ‘ActivityWorker` simply uses the object you provided.
Public Class Methods
Creates a new ‘ActivityWorker` instance.
@param service
The Amazon SWF [Client](http://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/Client.html) used to register this activity worker.
@param [String] domain
The Amazon SWF [Domain](http://docs.aws.amazon.com/AWSRubySDK/latest/AWS/SimpleWorkflow/Domain.html) to operate on.
@param [Array] task_list
The default task list to put all of the activity requests.
@param args
The activities to use.
AWS::Flow::GenericWorker::new
# File lib/aws/decider/worker.rb, line 285 def initialize(service, domain, task_list, *args, &block) @activity_definition_map = {} @activity_type_options = [] @options = Utilities::interpret_block_for_options(WorkerOptions, block) if @options @logger = @options.logger || Utilities::LogFactory.make_logger(self) @options.logger ||= @logger # Set the number of execution workers to 0 if it's not already set and # if the platform is Windows @options.execution_workers ||= 0 if AWS::Flow.on_windows? max_workers = @options.execution_workers # If max_workers is set to 0, then turn forking off @options.use_forking = false if (max_workers && max_workers.zero?) end max_workers = 20 if (max_workers.nil?) @executor = ForkingExecutor.new( :max_workers => max_workers, :logger => @logger ) @shutdown_first_time_function = lambda do @executor.shutdown Float::INFINITY Kernel.exit end super(service, domain, task_list, *args) end
Public Instance Methods
Adds an activity implementation to this ‘ActivityWorker`.
@param [Activity] class_or_instance
The {Activity} class or instance to add.
# File lib/aws/decider/worker.rb, line 361 def add_activities_implementation(class_or_instance) klass = (class_or_instance.class == Class) ? class_or_instance : class_or_instance.class instance = (class_or_instance.class == Class) ? class_or_instance.new : class_or_instance klass.activities.each do |activity_type| # TODO this should assign to an activityImplementation, so that we can # call execute on it later @activity_definition_map[activity_type] = ActivityDefinition.new( instance, activity_type.name.split(".").last, nil, activity_type.options, activity_type.options.data_converter ) options = activity_type.options option_hash = { :domain => @domain.name, :name => activity_type.name.to_s, :version => activity_type.version } option_hash.merge!(options.get_registration_options) if options.default_task_list option_hash.merge!( :default_task_list => {:name => resolve_default_task_list(options.default_task_list)} ) end @activity_type_options << option_hash end end
Adds an activity implementation to this ‘ActivityWorker`.
@param [Activity] class_or_instance
The {Activity} class or instance to add.
# File lib/aws/decider/worker.rb, line 319 def add_implementation(class_or_instance) add_activities_implementation(class_or_instance) end
Registers the activity type.
# File lib/aws/decider/worker.rb, line 325 def register @activity_type_options.each do |activity_type_options| begin @service.register_activity_type(activity_type_options) rescue AWS::SimpleWorkflow::Errors::TypeAlreadyExistsFault => e @logger.warn "#{e.class} while trying to register activity #{e.message} with options #{activity_type_options}" previous_registration = @service.describe_activity_type( :domain => @domain.name, :activity_type => { :name => activity_type_options[:name], :version => activity_type_options[:version] } ) default_options = activity_type_options.select { |key, val| key =~ /default/} previous_keys = previous_registration["configuration"].keys.map {|x| camel_case_to_snake_case(x).to_sym} previous_registration = Hash[previous_keys.zip(previous_registration["configuration"].values)] if previous_registration[:default_task_list] previous_registration[:default_task_list][:name] = previous_registration[:default_task_list].delete("name") end registration_difference = default_options.sort.to_a - previous_registration.sort.to_a unless registration_difference.empty? raise "Activity [#{activity_type_options[:name]}]: There is a difference between the types you have registered previously and the types you are currently registering, but you haven't changed the version. These new changes will not be picked up. In particular, these options are different #{Hash[registration_difference]}" end # Purposefully eaten up, the alternative is to check first, and who # wants to do two trips when one will do? end end end
Starts the activity that was added to the ‘ActivityWorker` and, optionally, sets the {ActivityTaskPoller}.
@param [true, false] should_register
Set to `false` if the activity should not register itself (it is already registered).
@param [ActivityTaskPoller] poller
The {ActivityTaskPoller} to use. If this is not set, a default {ActivityTaskPoller} will be created.
# File lib/aws/decider/worker.rb, line 430 def run_once(should_register = true, poller = nil) register if should_register poller = ActivityTaskPoller.new( @service, @domain, @task_list, @activity_definition_map, @executor, @options ) if poller.nil? Kernel.exit if @shutting_down poller.poll_and_process_single_task(@options.use_forking) end
Starts the activity that was added to the ‘ActivityWorker`.
@param [true, false] should_register
Set to `false` if the activity should not register itself (it is already registered).
# File lib/aws/decider/worker.rb, line 401 def start(should_register = true) register if should_register poller = ActivityTaskPoller.new( @service, @domain, @task_list, @activity_definition_map, @executor, @options ) @logger.debug "Starting an infinite loop to poll and process activity tasks." loop do run_once(false, poller) end end