class AWS::Flow::GenericActivityClient

A generic activity client that can be used to perform standard activity actions.

Attributes

data_converter[RW]

The data converter used for serializing/deserializing data when sending requests to and receiving results from workflow executions of this workflow type. By default, this is {YAMLDataConverter}.

decision_helper[RW]

The decision helper used by the activity client.

options[RW]

A hash of {ActivityRuntimeOptions} for the activity client.

Public Class Methods

default_option_class() click to toggle source

Returns the default option class for the activity client, which is {ActivityRuntimeOptions}.

# File lib/aws/decider/activity.rb, line 54
def self.default_option_class; ActivityRuntimeOptions; end
new(decision_helper, options) click to toggle source

Creates a new GenericActivityClient instance.

@param [DecisionHelper] decision_helper

The decision helper to use for the activity client.

@param [ActivityOptions] options

The activity options to set for the activity client.
Calls superclass method
# File lib/aws/decider/activity.rb, line 74
def initialize(decision_helper, options)
  @decision_helper = decision_helper
  @options = options
  @activity_option_map = @decision_helper.activity_options
  @failure_map = {}
  @data_converter ||= YAMLDataConverter.new
  super
end

Public Instance Methods

activity_name_from_activity_type(name) click to toggle source

Separates the activity name from the activity type at the point of the last period.

@param [String] name

The name of the activity type.

@api private

# File lib/aws/decider/activity.rb, line 62
def activity_name_from_activity_type(name)
  return name.to_s.split(".").last.to_sym
end
handle_activity_task_canceled(event) click to toggle source

A handler for the ‘ActivityClassCanceled` event.

@param [AWS::SimpleWorkflow::HistoryEvent] event

The event data.
# File lib/aws/decider/activity.rb, line 147
def handle_activity_task_canceled(event)
  activity_id = @decision_helper.get_activity_id(event.attributes[:scheduled_event_id])
  @decision_helper[activity_id].consume(:handle_cancellation_event)
  if @decision_helper[activity_id].done?
    open_request = @decision_helper.scheduled_activities.delete(activity_id)
    exception = CancellationException.new("Cancelled from ActivityTaskCanceledEvent", nil)
    if ! open_request.nil?
      open_request.completion_handle.fail(exception)
    end
  end
end
handle_activity_task_completed(event) click to toggle source

A handler for the ‘ActivityClassCompleted` event.

@param [AWS::SimpleWorkflow::HistoryEvent] event

The event data.
# File lib/aws/decider/activity.rb, line 222
def handle_activity_task_completed(event)
  scheduled_id = event.attributes[:scheduled_event_id]
  activity_id = @decision_helper.activity_scheduling_event_id_to_activity_id[scheduled_id]
  @decision_helper[activity_id].consume(:handle_completion_event)
  if @decision_helper[activity_id].done?
    open_request = @decision_helper.scheduled_activities.delete(activity_id)
    open_request.result = event.attributes[:result]
    open_request.completion_handle.complete
  end
end
handle_activity_task_failed(event) click to toggle source

A handler for the ‘ActivityTaskFailed` event.

@param [ActivityClassFailed] event

The event data.
# File lib/aws/decider/activity.rb, line 183
def handle_activity_task_failed(event)
  attributes = event.attributes
  activity_id = @decision_helper.get_activity_id(attributes[:scheduled_event_id])
  @decision_helper[activity_id].consume(:handle_completion_event)
  open_request_info = @decision_helper.scheduled_activities.delete(activity_id)
  reason = attributes[:reason] if attributes.keys.include? :reason
  reason ||= "The activity which failed did not provide a reason"
  details = attributes[:details] if attributes.keys.include? :details
  details ||= "The activity which failed did not provide details"

  # TODO consider adding user_context to open request, and adding it here
  # @decision_helper[@decision_helper.activity_scheduling_event_id_to_activity_id[event.attributes.scheduled_event_id]].attributes[:options].data_converter
  failure = ActivityTaskFailedException.new(event.id, activity_id, reason, details)
  open_request_info.completion_handle.fail(failure)
end
handle_activity_task_timed_out(event) click to toggle source

A handler for the ‘ActivityClassTimedOut` event.

@param [AWS::SimpleWorkflow::HistoryEvent] event

The event data.
# File lib/aws/decider/activity.rb, line 164
def handle_activity_task_timed_out(event)
  activity_id = @decision_helper.get_activity_id(event.attributes[:scheduled_event_id])
  activity_state_machine = @decision_helper[activity_id]
  activity_state_machine.consume(:handle_completion_event)
  if activity_state_machine.done?
    open_request = @decision_helper.scheduled_activities.delete(activity_id)
    if ! open_request.nil?
      timeout_type = event.attributes[:timeout_type]
      failure = ActivityTaskTimedOutException.new(event.id, activity_id, timeout_type, "Time out")
      open_request.completion_handle.fail(failure)
    end
  end
end
handle_schedule_activity_task_failed(event) click to toggle source

A handler for the ‘ScheduleActivityTaskFailed` event.

@param [AWS::SimpleWorkflow::HistoryEvent] event

The event data.
# File lib/aws/decider/activity.rb, line 204
def handle_schedule_activity_task_failed(event)
  attributes = event.attributes
  activity_id = attributes[:activity_id]
  open_request_info = @decision_helper.scheduled_activities.delete(activity_id)
  activity_state_machine = @decision_helper[activity_id]
  activity_state_machine.consume(:handle_initiation_failed_event)
  if activity_state_machine.done?
    # TODO Fail task correctly
    failure = ScheduleActivityTaskFailedException.new(event.id, event.attributes.activity_type, activity_id, event.attributes.cause)
    open_request_info.completion_handle.fail(failure)
  end
end
method_missing(method_name, *args, &block) click to toggle source

Registers and schedules a new activity type, provided a name and block of options.

@param method_name

*Required*. The name of the activity type to define.

@param args

*Required*. Arguments for the method provided in *method_name*.

@param block

*Required*. A block of {ActivityOptions} to use when registering the new {ActivityType}. This can be set to an
empty block, in which case, the default activity options will be used.
# File lib/aws/decider/activity.rb, line 95
def method_missing(method_name, *args, &block)
  options = Utilities::interpret_block_for_options(ActivityOptions, block)
  client_options = Utilities::client_options_from_method_name(method_name, @options)

  options = Utilities::merge_all_options(client_options,
                              @activity_option_map[method_name.to_sym],
                              @option_map[method_name.to_sym],
                              options
                              )
  new_options = ActivityOptions.new(options)

  activity_type = ActivityType.new("#{new_options.prefix_name}.#{method_name.to_s}", new_options.version, new_options.get_registration_options)
  if new_options._exponential_retry
    retry_function = new_options._exponential_retry.retry_function || FlowConstants.exponential_retry_function
    new_options._exponential_retry.return_on_start ||= new_options.return_on_start
    future = _retry_with_options(lambda { self.schedule_activity(activity_type.name, activity_type, args, new_options ) }, retry_function, new_options._exponential_retry, args)
    return future if new_options.return_on_start
    result = Utilities::drill_on_future(future)
  else
    result = schedule_activity(activity_type.name, activity_type, args, new_options)
  end
  result
end
method_to_retry_alias(method_name) click to toggle source

@api private

# File lib/aws/decider/activity.rb, line 125
def method_to_retry_alias(method_name)
  "#{__method_name.to_s + "_retry"}".to_sym
end
request_cancel_activity_task(to_cancel) click to toggle source

Requests that the activity is canceled.

@param [WorkflowFuture] to_cancel

The {WorkflowFuture} for the task to be canceled.
# File lib/aws/decider/activity.rb, line 134
def request_cancel_activity_task(to_cancel)
  metadata = to_cancel.metadata
  if ! metadata.respond_to? :activity_id
    raise "You need to use a future obtained from an activity"
  end
  @decision_helper[metadata.activity_id].consume(:cancel)
end
retry_alias_to_method(retry_alias) click to toggle source

@api private

# File lib/aws/decider/activity.rb, line 120
def retry_alias_to_method(retry_alias)
  retry_alias.to_s[/__(.*)_retry/, 1].to_sym
end
schedule_activity(name, activity_type, input, options) click to toggle source

Schedules a named activity.

@param [String] name

*Required*. The name of the activity to schedule.

@param [String] activity_type

*Required*. The activity type for this scheduled activity.

@param [Object] input

*Required*. Additional data passed to the activity.

@param [ActivityOptions] options

*Required*. {ActivityOptions} to set for the scheduled activity.
# File lib/aws/decider/activity.rb, line 247
def schedule_activity(name, activity_type, input, options)
  options = Utilities::merge_all_options(@option_map[activity_name_from_activity_type(name)], options)
  new_options = ActivityOptions.new(options)
  output = Utilities::AddressableFuture.new
  open_request = OpenRequestInfo.new
  decision_id = @decision_helper.get_next_id(:Activity)
  output._metadata = ActivityMetadata.new(decision_id)
  error_handler do |t|
    t.begin do
      @data_converter = new_options.data_converter

      #input = input.map { |input_part| @data_converter.dump input_part } unless input.nil?
      input = @data_converter.dump input unless input.empty?
      attributes = {}
      new_options.input ||= input unless input.empty?
      attributes[:options] = new_options
      attributes[:activity_type] = activity_type
      attributes[:decision_id] = decision_id
      @completion_handle = nil
      external_task do |t|
        t.initiate_task do |handle|
          open_request.completion_handle = handle

          @decision_helper.scheduled_activities[decision_id.to_s] = open_request
          @decision_helper[decision_id.to_s] = ActivityDecisionStateMachine.new(decision_id, attributes)
        end
        t.cancellation_handler do |this_handle, cause|
          state_machine = @decision_helper[decision_id.to_s]
          if state_machine.current_state == :created
            open_request = @decision_helper.scheduled_activities.delete(decision_id.to_s)
            open_request.completion_handle.complete
          end
          state_machine.consume(:cancel)
        end
      end
    end
    t.rescue(Exception) do |error|
      @data_converter = new_options.data_converter
      # If we have an ActivityTaskFailedException, then we should figure
      # out what the cause was, and pull that out. If it's anything else,
      # we should serialize the error, and stuff that into details, so
      # that things above us can pull it out correctly. We don't have to
      # do this for ActivityTaskFailedException, as the details is
      # *already* serialized.
      if error.is_a? ActivityTaskFailedException
        details = @data_converter.load(error.details)
        error.cause = details
      else
        details = @data_converter.dump(error)
        error.details = details
      end
      @failure_map[decision_id.to_s] = error
    end
    t.ensure do
      @data_converter = new_options.data_converter
      result = @data_converter.load open_request.result
      output.set(result)
      raise @failure_map[decision_id.to_s] if @failure_map[decision_id.to_s] && new_options.return_on_start
    end
  end
  return output if new_options.return_on_start
  output.get
  this_failure = @failure_map[decision_id.to_s]
  raise this_failure if this_failure
  return output.get
end