class AWS::Flow::WorkflowTaskPoller
Public Class Methods
new(service, domain, handler, task_list, options=nil)
click to toggle source
Creates a new ‘WorkflowTaskPoller`.
@param service
The Amazon SWF service object on which this task poller will operate.
@param [String] domain
The name of the domain containing the task lists to poll.
@param [DecisionTaskHandler] handler
A {DecisionTaskHandler} to handle polled tasks. The poller will call the {DecisionTaskHandler#handle_decision_task} method.
@param [Array] task_list
Specifies the task list to poll for decision tasks.
@param [Object] options
Options to use for the logger.
# File lib/aws/decider/task_poller.rb, line 40 def initialize(service, domain, handler, task_list, options=nil) @service = service @handler = handler @domain = domain @task_list = task_list @logger = options.logger if options @logger ||= Utilities::LogFactory.make_logger(self) end
Public Instance Methods
get_decision_task()
click to toggle source
@api private Retrieves any decision tasks that are ready.
# File lib/aws/decider/task_poller.rb, line 51 def get_decision_task @domain.decision_tasks.poll_for_single_task(@task_list) end
poll_and_process_single_task()
click to toggle source
# File lib/aws/decider/task_poller.rb, line 55 def poll_and_process_single_task # TODO waitIfSuspended begin @logger.debug "Polling for a new decision task of type #{@handler.workflow_definition_map.keys.map{ |x| "#{x.name} #{x.version}"} } on task_list: #{@task_list}" task = get_decision_task if task.nil? @logger.debug "Didn't get a task on task_list: #{@task_list}" return false end @logger.info Utilities.workflow_task_to_debug_string("Got decision task", task, @task_list) task_completed_request = @handler.handle_decision_task(task) @logger.debug "Response to the task will be #{task_completed_request.inspect}" if !task_completed_request[:decisions].empty? && (task_completed_request[:decisions].first.keys.include?(:fail_workflow_execution_decision_attributes)) fail_hash = task_completed_request[:decisions].first[:fail_workflow_execution_decision_attributes] reason = fail_hash[:reason] details = fail_hash[:details] end begin @service.respond_decision_task_completed(task_completed_request) rescue AWS::SimpleWorkflow::Errors::ValidationException => e if e.message.include? "failed to satisfy constraint: Member must have length less than or equal to" # We want to ensure that the WorkflowWorker doesn't just sit around and # time the workflow out. If there is a validation failure possibly # because of large inputs to child workflows/activities or large custom # exceptions we should fail the workflow with some minimal details. task_completed_request[:decisions] = [ { decision_type: "FailWorkflowExecution", fail_workflow_execution_decision_attributes: { reason: Utilities.validation_error_string("Workflow"), details: "AWS::SimpleWorkflow::Errors::ValidationException" } } ] @service.respond_decision_task_completed(task_completed_request) end @logger.error "#{task.workflow_type.inspect} failed with exception: #{e.inspect}" end @logger.info Utilities.workflow_task_to_debug_string("Finished executing task", task, @task_list) rescue AWS::SimpleWorkflow::Errors::UnknownResourceFault => e @logger.error "Error in the poller, #{e.inspect}" rescue Exception => e @logger.error "Error in the poller, #{e.inspect}" end end