class Kuroko2::Workflow::Engine

Constants

DEFAULT_EXPECTED_TIME
EXPECTED_TIME_NOTIFY_REMIND_TERM

Public Instance Methods

failure(token) click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 59
def failure(token)
  message = "(token #{token.uuid}) Mark as failure."

  token.job_instance.logs.error(message)
  token.job_instance.touch(:error_at)
  token.mark_as_failure

  Kuroko2.logger.info(message)

  Notifier.notify(:failure, token.job_instance)

  if token.context['AUTO_SKIP_ERROR']
    skip(token)
  end
end
process(token) click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 13
def process(token)
  unless token.working? || token.waiting?
    Kuroko2.logger.info { "(token #{token.uuid}) Skip since current status marked as '#{token.status_name}'." }

    return
  end

  token.with_lock { process_with_lock(token) }
end
process_all() click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 7
def process_all
  Token.processable.each do |token|
    process(token)
  end
end
retry(token) click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 23
def retry(token)
  token.with_lock do
    node = extract_node(token)

    message = "(token #{token.uuid}) Retry current node: '#{node.type}: #{node.option}'"
    token.job_instance.update_columns(error_at: nil, retrying: true)
    token.job_instance.logs.info(message)

    token.mark_as_working
    token.save!

    Kuroko2.logger.info(message)

    Notifier.notify(:retrying, token.job_instance)
  end
end
skip(token) click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 40
def skip(token)
  token.with_lock do
    node = extract_node(token)

    message = "(token #{token.uuid}) Skip current node: '#{node.type}: #{node.option}'"
    token.job_instance.update_column(:error_at, nil)
    token.job_instance.logs.info(message)

    token.mark_as_working
    process_next(node.next, token)

    token.save! unless token.destroyed?

    Kuroko2.logger.info(message)

    Notifier.notify(:skipping, token.job_instance)
  end
end

Private Instance Methods

auto_retry(node, token) click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 184
def auto_retry(node, token)
  token.context['RETRY'][node.path]['retried_count'] += 1

  message = "(token #{token.uuid}) Retry current node: '#{node.type}: #{node.option}'"
  token.job_instance.logs.info(message)
  Kuroko2.logger.info(message)

  message = "(token #{token.uuid}) The number of retries: " << 
    "#{token.context['RETRY'][node.path]['retried_count']} / #{token.context['RETRY'][node.path]['count']}"
  token.job_instance.logs.info(message)
  Kuroko2.logger.info(message)

  set_sleep_context_before_retrying(node, token)
  
  message = "(token #{token.uuid}) Sleep for #{token.context['RETRY'][node.path]['sleep_time']} seconds"
  token.job_instance.logs.info(message)
  Kuroko2.logger.info(message)
end
auto_retryable?(node, token) click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 207
def auto_retryable?(node, token)
  token.context['RETRY'].present? &&
    token.context['RETRY'][node.path].present? &&
    token.context['RETRY'][node.path]['count'] > token.context['RETRY'][node.path]['retried_count']
end
available_notify_long_elapsed_time?(token) click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 164
def available_notify_long_elapsed_time?(token)
  return false if token.parent && expected_time(token) == expected_time(token.parent)
  token.context['EXPECTED_TIME_NOTIFIED_AT'].nil? || Time.zone.parse(token.context['EXPECTED_TIME_NOTIFIED_AT']) < EXPECTED_TIME_NOTIFY_REMIND_TERM.ago
end
elapsed_expected_time?(token) click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 169
def elapsed_expected_time?(token)
  (token.created_at + expected_time(token).minutes).past?
end
execute_task(node, token) click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 77
def execute_task(node, token)
  result = if sleeping?(token)
      :pass
    else
      node.execute(token)
    end

  case result
  when :next
    process_next(node.next, token)
  when :next_sibling
    process_next(node.next_sibling, token)
  when :pass
    # Do nothing
  when :failure
    if auto_retryable?(node, token)
      auto_retry(node, token)
    else
      failure(token)
    end
  end
rescue KeyError => e
  raise EngineError.new(e.message)
end
expected_time(token) click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 158
def expected_time(token)
  token.context['EXPECTED_TIME'].present? ?
    token.context['EXPECTED_TIME'].to_i :
    DEFAULT_EXPECTED_TIME
end
extract_node(token) click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 153
def extract_node(token)
  root = ScriptParser.new(token.script).parse(validate: false)
  root.find(token.path)
end
notify_long_elapsed_time_if_needed(token) click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 173
def notify_long_elapsed_time_if_needed(token)
  if available_notify_long_elapsed_time?(token) && elapsed_expected_time?(token)
    token.context['EXPECTED_TIME_NOTIFIED_AT'] = Time.current
    Notifier.notify(:long_elapsed_time, token.job_instance)

    message = "(token #{token.uuid}) The running time is longer than #{expected_time(token)} minutes!"
    token.job_instance.logs.info(message)
    Kuroko2.logger.info(message)
  end
end
process_next(node, token) click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 106
def process_next(node, token)
  if node
    message = "(token #{token.uuid}) Current node is '#{token.path}'."

    token.path = node.path
    token.job_instance.logs.info(message)

    Kuroko2.logger.info(message)
  else
    message = "(token #{token.uuid}) Marked as 'finished'."

    token.job_instance.logs.info(message)
    Kuroko2.logger.info(message)
    token.mark_as_finished
    unless token.parent
      token.job_instance.touch(:finished_at)
      if token.job_instance.retrying?
        Notifier.notify(:back_to_normal, token.job_instance)
      else
        Notifier.notify(:finished, token.job_instance)
      end
      token.destroy!
    end
  end
end
process_with_lock(token) click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 132
def process_with_lock(token)
  node = extract_node(token)

  execute_task(node, token)
  notify_long_elapsed_time_if_needed(token)
rescue EngineError => e
  message = "#{e.message}\n" + e.backtrace.map { |trace| "    #{trace}" }.join("\n")

  token.mark_as_critical(e)
  token.job_instance.logs.error("(token #{token.uuid}) #{message}")
  token.job_instance.touch(:canceled_at)

  Token.where(job_definition: token.job_definition).delete_all
  token.job_instance.logs.warn("(token #{token.uuid}) This job is canceled.")

  Kuroko2.logger.error(message)
  Notifier.notify(:critical, token.job_instance)
ensure
  token.save! unless token.destroyed?
end
set_sleep_context_before_retrying(node, token) click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 203
def set_sleep_context_before_retrying(node, token)
  token.context['SLEEP'] = Time.current.to_i + token.context['RETRY'][node.path]['sleep_time']
end
sleeping?(token) click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 102
def sleeping?(token)
  token.context['SLEEP'].present? && token.context['SLEEP'] > Time.current.to_i
end