class Kuroko2::Workflow::Engine
Constants
- DEFAULT_EXPECTED_TIME
- EXPECTED_TIME_NOTIFY_REMIND_TERM
Public Instance Methods
failure(token)
click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 59 def failure(token) message = "(token #{token.uuid}) Mark as failure." token.job_instance.logs.error(message) token.job_instance.touch(:error_at) token.mark_as_failure Kuroko2.logger.info(message) Notifier.notify(:failure, token.job_instance) if token.context['AUTO_SKIP_ERROR'] skip(token) end end
process(token)
click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 13 def process(token) unless token.working? || token.waiting? Kuroko2.logger.info { "(token #{token.uuid}) Skip since current status marked as '#{token.status_name}'." } return end token.with_lock { process_with_lock(token) } end
process_all()
click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 7 def process_all Token.processable.each do |token| process(token) end end
retry(token)
click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 23 def retry(token) token.with_lock do node = extract_node(token) message = "(token #{token.uuid}) Retry current node: '#{node.type}: #{node.option}'" token.job_instance.update_columns(error_at: nil, retrying: true) token.job_instance.logs.info(message) token.mark_as_working token.save! Kuroko2.logger.info(message) Notifier.notify(:retrying, token.job_instance) end end
skip(token)
click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 40 def skip(token) token.with_lock do node = extract_node(token) message = "(token #{token.uuid}) Skip current node: '#{node.type}: #{node.option}'" token.job_instance.update_column(:error_at, nil) token.job_instance.logs.info(message) token.mark_as_working process_next(node.next, token) token.save! unless token.destroyed? Kuroko2.logger.info(message) Notifier.notify(:skipping, token.job_instance) end end
Private Instance Methods
auto_retry(node, token)
click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 184 def auto_retry(node, token) token.context['RETRY'][node.path]['retried_count'] += 1 message = "(token #{token.uuid}) Retry current node: '#{node.type}: #{node.option}'" token.job_instance.logs.info(message) Kuroko2.logger.info(message) message = "(token #{token.uuid}) The number of retries: " << "#{token.context['RETRY'][node.path]['retried_count']} / #{token.context['RETRY'][node.path]['count']}" token.job_instance.logs.info(message) Kuroko2.logger.info(message) set_sleep_context_before_retrying(node, token) message = "(token #{token.uuid}) Sleep for #{token.context['RETRY'][node.path]['sleep_time']} seconds" token.job_instance.logs.info(message) Kuroko2.logger.info(message) end
auto_retryable?(node, token)
click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 207 def auto_retryable?(node, token) token.context['RETRY'].present? && token.context['RETRY'][node.path].present? && token.context['RETRY'][node.path]['count'] > token.context['RETRY'][node.path]['retried_count'] end
available_notify_long_elapsed_time?(token)
click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 164 def available_notify_long_elapsed_time?(token) return false if token.parent && expected_time(token) == expected_time(token.parent) token.context['EXPECTED_TIME_NOTIFIED_AT'].nil? || Time.zone.parse(token.context['EXPECTED_TIME_NOTIFIED_AT']) < EXPECTED_TIME_NOTIFY_REMIND_TERM.ago end
elapsed_expected_time?(token)
click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 169 def elapsed_expected_time?(token) (token.created_at + expected_time(token).minutes).past? end
execute_task(node, token)
click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 77 def execute_task(node, token) result = if sleeping?(token) :pass else node.execute(token) end case result when :next process_next(node.next, token) when :next_sibling process_next(node.next_sibling, token) when :pass # Do nothing when :failure if auto_retryable?(node, token) auto_retry(node, token) else failure(token) end end rescue KeyError => e raise EngineError.new(e.message) end
expected_time(token)
click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 158 def expected_time(token) token.context['EXPECTED_TIME'].present? ? token.context['EXPECTED_TIME'].to_i : DEFAULT_EXPECTED_TIME end
extract_node(token)
click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 153 def extract_node(token) root = ScriptParser.new(token.script).parse(validate: false) root.find(token.path) end
notify_long_elapsed_time_if_needed(token)
click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 173 def notify_long_elapsed_time_if_needed(token) if available_notify_long_elapsed_time?(token) && elapsed_expected_time?(token) token.context['EXPECTED_TIME_NOTIFIED_AT'] = Time.current Notifier.notify(:long_elapsed_time, token.job_instance) message = "(token #{token.uuid}) The running time is longer than #{expected_time(token)} minutes!" token.job_instance.logs.info(message) Kuroko2.logger.info(message) end end
process_next(node, token)
click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 106 def process_next(node, token) if node message = "(token #{token.uuid}) Current node is '#{token.path}'." token.path = node.path token.job_instance.logs.info(message) Kuroko2.logger.info(message) else message = "(token #{token.uuid}) Marked as 'finished'." token.job_instance.logs.info(message) Kuroko2.logger.info(message) token.mark_as_finished unless token.parent token.job_instance.touch(:finished_at) if token.job_instance.retrying? Notifier.notify(:back_to_normal, token.job_instance) else Notifier.notify(:finished, token.job_instance) end token.destroy! end end end
process_with_lock(token)
click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 132 def process_with_lock(token) node = extract_node(token) execute_task(node, token) notify_long_elapsed_time_if_needed(token) rescue EngineError => e message = "#{e.message}\n" + e.backtrace.map { |trace| " #{trace}" }.join("\n") token.mark_as_critical(e) token.job_instance.logs.error("(token #{token.uuid}) #{message}") token.job_instance.touch(:canceled_at) Token.where(job_definition: token.job_definition).delete_all token.job_instance.logs.warn("(token #{token.uuid}) This job is canceled.") Kuroko2.logger.error(message) Notifier.notify(:critical, token.job_instance) ensure token.save! unless token.destroyed? end
set_sleep_context_before_retrying(node, token)
click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 203 def set_sleep_context_before_retrying(node, token) token.context['SLEEP'] = Time.current.to_i + token.context['RETRY'][node.path]['sleep_time'] end
sleeping?(token)
click to toggle source
# File lib/autoload/kuroko2/workflow/engine.rb, line 102 def sleeping?(token) token.context['SLEEP'].present? && token.context['SLEEP'] > Time.current.to_i end