class Cadence::Activity::TaskProcessor

Attributes

activity_class[R]
activity_name[R]
client[R]
domain[R]
middleware_chain[R]
task[R]
task_token[R]

Public Class Methods

new(task, domain, activity_lookup, client, middleware_chain) click to toggle source
# File lib/cadence/activity/task_processor.rb, line 8
def initialize(task, domain, activity_lookup, client, middleware_chain)
  @task = task
  @domain = domain
  @task_token = task.taskToken
  @activity_name = task.activityType.name
  @activity_class = activity_lookup.find(activity_name)
  @client = client
  @middleware_chain = middleware_chain
end

Public Instance Methods

process() click to toggle source
# File lib/cadence/activity/task_processor.rb, line 18
def process
  start_time = Time.now

  Cadence.logger.info("Processing activity task for #{activity_name}")
  Cadence.metrics.timing('activity_task.queue_time', queue_time_ms, activity: activity_name)

  if !activity_class
    respond_failed('ActivityNotRegistered', 'Activity is not registered with this worker')
    return
  end

  metadata = Metadata.generate(Metadata::ACTIVITY_TYPE, task, domain)
  context = Activity::Context.new(client, metadata)

  result = middleware_chain.invoke(metadata) do
    activity_class.execute_in_context(context, JSON.deserialize(task.input))
  end

  # Do not complete asynchronous activities, these should be completed manually
  respond_completed(result) unless context.async?
rescue StandardError, ScriptError => error
  respond_failed(error.class.name, error.message)
rescue Exception => error
  Cadence.logger.fatal("Activity #{activity_name} unexpectedly failed with: #{error.inspect}")
  Cadence.logger.debug(error.backtrace.join("\n"))
  raise
ensure
  time_diff_ms = ((Time.now - start_time) * 1000).round
  Cadence.metrics.timing('activity_task.latency', time_diff_ms, activity: activity_name)
  Cadence.logger.debug("Activity task processed in #{time_diff_ms}ms")
end

Private Instance Methods

queue_time_ms() click to toggle source
# File lib/cadence/activity/task_processor.rb, line 54
def queue_time_ms
  ((task.startedTimestamp - task.scheduledTimestampOfThisAttempt) / 1_000_000).round
end
respond_completed(result) click to toggle source
# File lib/cadence/activity/task_processor.rb, line 58
def respond_completed(result)
  Cadence.logger.info("Activity #{activity_name} completed")
  client.respond_activity_task_completed(task_token: task_token, result: result)
rescue StandardError => error
  Cadence.logger.error("Unable to complete Activity #{activity_name}: #{error.inspect}")
end
respond_failed(reason, details) click to toggle source
# File lib/cadence/activity/task_processor.rb, line 65
def respond_failed(reason, details)
  Cadence.logger.error("Activity #{activity_name} failed with: #{reason}")
  client.respond_activity_task_failed(task_token: task_token, reason: reason, details: details)
rescue StandardError => error
  Cadence.logger.error("Unable to fail Activity #{activity_name}: #{error.inspect}")
end