class AWS::Flow::GenericActivityClient
A generic activity client that can be used to perform standard activity actions.
Attributes
The data converter used for serializing/deserializing data when sending requests to and receiving results from workflow executions of this workflow type. By default, this is {YAMLDataConverter}.
The decision helper used by the activity client.
A hash of {ActivityRuntimeOptions} for the activity client.
Public Class Methods
Returns the default option class for the activity client, which is {ActivityRuntimeOptions}.
# File lib/aws/decider/activity.rb, line 54 def self.default_option_class; ActivityRuntimeOptions; end
Creates a new GenericActivityClient
instance.
@param [DecisionHelper] decision_helper
The decision helper to use for the activity client.
@param [ActivityOptions] options
The activity options to set for the activity client.
# File lib/aws/decider/activity.rb, line 74 def initialize(decision_helper, options) @decision_helper = decision_helper @options = options @activity_option_map = @decision_helper.activity_options @failure_map = {} @data_converter ||= YAMLDataConverter.new super end
Public Instance Methods
Separates the activity name from the activity type at the point of the last period.
@param [String] name
The name of the activity type.
@api private
# File lib/aws/decider/activity.rb, line 62 def activity_name_from_activity_type(name) return name.to_s.split(".").last.to_sym end
A handler for the ‘ActivityClassCanceled` event.
@param [AWS::SimpleWorkflow::HistoryEvent] event
The event data.
# File lib/aws/decider/activity.rb, line 147 def handle_activity_task_canceled(event) activity_id = @decision_helper.get_activity_id(event.attributes[:scheduled_event_id]) @decision_helper[activity_id].consume(:handle_cancellation_event) if @decision_helper[activity_id].done? open_request = @decision_helper.scheduled_activities.delete(activity_id) exception = CancellationException.new("Cancelled from ActivityTaskCanceledEvent", nil) if ! open_request.nil? open_request.completion_handle.fail(exception) end end end
A handler for the ‘ActivityClassCompleted` event.
@param [AWS::SimpleWorkflow::HistoryEvent] event
The event data.
# File lib/aws/decider/activity.rb, line 222 def handle_activity_task_completed(event) scheduled_id = event.attributes[:scheduled_event_id] activity_id = @decision_helper.activity_scheduling_event_id_to_activity_id[scheduled_id] @decision_helper[activity_id].consume(:handle_completion_event) if @decision_helper[activity_id].done? open_request = @decision_helper.scheduled_activities.delete(activity_id) open_request.result = event.attributes[:result] open_request.completion_handle.complete end end
A handler for the ‘ActivityTaskFailed` event.
@param [ActivityClassFailed] event
The event data.
# File lib/aws/decider/activity.rb, line 183 def handle_activity_task_failed(event) attributes = event.attributes activity_id = @decision_helper.get_activity_id(attributes[:scheduled_event_id]) @decision_helper[activity_id].consume(:handle_completion_event) open_request_info = @decision_helper.scheduled_activities.delete(activity_id) reason = attributes[:reason] if attributes.keys.include? :reason reason ||= "The activity which failed did not provide a reason" details = attributes[:details] if attributes.keys.include? :details details ||= "The activity which failed did not provide details" # TODO consider adding user_context to open request, and adding it here # @decision_helper[@decision_helper.activity_scheduling_event_id_to_activity_id[event.attributes.scheduled_event_id]].attributes[:options].data_converter failure = ActivityTaskFailedException.new(event.id, activity_id, reason, details) open_request_info.completion_handle.fail(failure) end
A handler for the ‘ActivityClassTimedOut` event.
@param [AWS::SimpleWorkflow::HistoryEvent] event
The event data.
# File lib/aws/decider/activity.rb, line 164 def handle_activity_task_timed_out(event) activity_id = @decision_helper.get_activity_id(event.attributes[:scheduled_event_id]) activity_state_machine = @decision_helper[activity_id] activity_state_machine.consume(:handle_completion_event) if activity_state_machine.done? open_request = @decision_helper.scheduled_activities.delete(activity_id) if ! open_request.nil? timeout_type = event.attributes[:timeout_type] failure = ActivityTaskTimedOutException.new(event.id, activity_id, timeout_type, "Time out") open_request.completion_handle.fail(failure) end end end
A handler for the ‘ScheduleActivityTaskFailed` event.
@param [AWS::SimpleWorkflow::HistoryEvent] event
The event data.
# File lib/aws/decider/activity.rb, line 204 def handle_schedule_activity_task_failed(event) attributes = event.attributes activity_id = attributes[:activity_id] open_request_info = @decision_helper.scheduled_activities.delete(activity_id) activity_state_machine = @decision_helper[activity_id] activity_state_machine.consume(:handle_initiation_failed_event) if activity_state_machine.done? # TODO Fail task correctly failure = ScheduleActivityTaskFailedException.new(event.id, event.attributes.activity_type, activity_id, event.attributes.cause) open_request_info.completion_handle.fail(failure) end end
Registers and schedules a new activity type, provided a name and block of options.
@param method_name
*Required*. The name of the activity type to define.
@param args
*Required*. Arguments for the method provided in *method_name*.
@param block
*Required*. A block of {ActivityOptions} to use when registering the new {ActivityType}. This can be set to an empty block, in which case, the default activity options will be used.
# File lib/aws/decider/activity.rb, line 95 def method_missing(method_name, *args, &block) options = Utilities::interpret_block_for_options(ActivityOptions, block) client_options = Utilities::client_options_from_method_name(method_name, @options) options = Utilities::merge_all_options(client_options, @activity_option_map[method_name.to_sym], @option_map[method_name.to_sym], options ) new_options = ActivityOptions.new(options) activity_type = ActivityType.new("#{new_options.prefix_name}.#{method_name.to_s}", new_options.version, new_options.get_registration_options) if new_options._exponential_retry retry_function = new_options._exponential_retry.retry_function || FlowConstants.exponential_retry_function new_options._exponential_retry.return_on_start ||= new_options.return_on_start future = _retry_with_options(lambda { self.schedule_activity(activity_type.name, activity_type, args, new_options ) }, retry_function, new_options._exponential_retry, args) return future if new_options.return_on_start result = Utilities::drill_on_future(future) else result = schedule_activity(activity_type.name, activity_type, args, new_options) end result end
@api private
# File lib/aws/decider/activity.rb, line 125 def method_to_retry_alias(method_name) "#{__method_name.to_s + "_retry"}".to_sym end
Requests that the activity is canceled.
@param [WorkflowFuture] to_cancel
The {WorkflowFuture} for the task to be canceled.
# File lib/aws/decider/activity.rb, line 134 def request_cancel_activity_task(to_cancel) metadata = to_cancel.metadata if ! metadata.respond_to? :activity_id raise "You need to use a future obtained from an activity" end @decision_helper[metadata.activity_id].consume(:cancel) end
@api private
# File lib/aws/decider/activity.rb, line 120 def retry_alias_to_method(retry_alias) retry_alias.to_s[/__(.*)_retry/, 1].to_sym end
Schedules a named activity.
@param [String] name
*Required*. The name of the activity to schedule.
@param [String] activity_type
*Required*. The activity type for this scheduled activity.
@param [Object] input
*Required*. Additional data passed to the activity.
@param [ActivityOptions] options
*Required*. {ActivityOptions} to set for the scheduled activity.
# File lib/aws/decider/activity.rb, line 247 def schedule_activity(name, activity_type, input, options) options = Utilities::merge_all_options(@option_map[activity_name_from_activity_type(name)], options) new_options = ActivityOptions.new(options) output = Utilities::AddressableFuture.new open_request = OpenRequestInfo.new decision_id = @decision_helper.get_next_id(:Activity) output._metadata = ActivityMetadata.new(decision_id) error_handler do |t| t.begin do @data_converter = new_options.data_converter #input = input.map { |input_part| @data_converter.dump input_part } unless input.nil? input = @data_converter.dump input unless input.empty? attributes = {} new_options.input ||= input unless input.empty? attributes[:options] = new_options attributes[:activity_type] = activity_type attributes[:decision_id] = decision_id @completion_handle = nil external_task do |t| t.initiate_task do |handle| open_request.completion_handle = handle @decision_helper.scheduled_activities[decision_id.to_s] = open_request @decision_helper[decision_id.to_s] = ActivityDecisionStateMachine.new(decision_id, attributes) end t.cancellation_handler do |this_handle, cause| state_machine = @decision_helper[decision_id.to_s] if state_machine.current_state == :created open_request = @decision_helper.scheduled_activities.delete(decision_id.to_s) open_request.completion_handle.complete end state_machine.consume(:cancel) end end end t.rescue(Exception) do |error| @data_converter = new_options.data_converter # If we have an ActivityTaskFailedException, then we should figure # out what the cause was, and pull that out. If it's anything else, # we should serialize the error, and stuff that into details, so # that things above us can pull it out correctly. We don't have to # do this for ActivityTaskFailedException, as the details is # *already* serialized. if error.is_a? ActivityTaskFailedException details = @data_converter.load(error.details) error.cause = details else details = @data_converter.dump(error) error.details = details end @failure_map[decision_id.to_s] = error end t.ensure do @data_converter = new_options.data_converter result = @data_converter.load open_request.result output.set(result) raise @failure_map[decision_id.to_s] if @failure_map[decision_id.to_s] && new_options.return_on_start end end return output if new_options.return_on_start output.get this_failure = @failure_map[decision_id.to_s] raise this_failure if this_failure return output.get end