module Jongleur::Implementation
this module encapsulates methods that are not meant to be accessed by the gem's client callers and are used by the API
module to implement functionality @see API
@api private
Public Class Methods
# File lib/jongleur/implementation.rb, line 81 def self.all_predecessors_finished_successfully?(task_name) get_predecessors(task_name).reduce(0) { |sum, t| sum + get_exit_status(t) }.zero? end
# File lib/jongleur/implementation.rb, line 77 def self.are_predecessors_running?(task_name) !get_predecessors(task_name).select(&:running).empty? end
Creates a list of tasks and their current state
@param [Hash] task_graph @see API.task_graph
@return [Array] task_matrix a list of Tasks
# File lib/jongleur/implementation.rb, line 18 def self.build_task_matrix(task_graph) return [] if task_graph.empty? # create it as a Set so we can easily ensure unique entries task_matrix = Set.new task_graph.keys.each { |t| task_matrix << Task.new(t, StatusCodes::PROCESS_NOT_YET_RAN, false) } task_graph.values.each do |val| val.each { |t| task_matrix << Task.new(t, StatusCodes::PROCESS_NOT_YET_RAN, false) } end task_matrix.to_a end
# File lib/jongleur/implementation.rb, line 155 def self.each_descendant(task) API.task_graph[task]&.each do |desc_task| # check desc_task isn't already running and that its predecessors are finished yield find_task_by(:name, desc_task) if !task_running?(desc_task) && finished_tasks.contains_array?(get_predecessors(desc_task)) end end
Find task based on an attribute's value
@note the methof will find the first matching task. If there are more than one matches, only the first one -in sequence order- will be returned @param [Symbol] attr_name @param [Object] attr_value could be a String, Integer, Boolean, etc. @yield [Jongleur::Task] the first task that matches the arguments @return [Jongleur::Task, nil] the first task that matches the arguments, nil if no matches are found
# File lib/jongleur/implementation.rb, line 149 def self.find_task_by(attr_name, attr_value) idx = API.task_matrix.index { |t| t.send(attr_name.to_s) == attr_value } yield API.task_matrix[idx] if block_given? && idx idx ? API.task_matrix[idx] : nil end
# File lib/jongleur/implementation.rb, line 128 def self.finished_tasks API.task_matrix.map { |t| t.name if t.running == false }.compact.extend(Helper) end
Gets a task's exit status @see ruby-doc.org/core-2.4.3/Process/Status.html
@param [Symbol] task_name @return [Integer] the task's exit status or StatusCodes::TASK_NOT_IN_TASK_MATRIX
# File lib/jongleur/implementation.rb, line 72 def self.get_exit_status(task_name) idx = API.task_matrix.index { |t| t.name == task_name } idx ? API.task_matrix[idx].exit_status : StatusCodes::TASK_NOT_IN_TASK_MATRIX end
Lists a task's dependent tasks
@param [Symbol] task @return [Array] a list of the dependent task names for the given task
# File lib/jongleur/implementation.rb, line 33 def self.get_predecessors(task) return [] if API.task_graph.empty? API.task_graph.select { |_k, v| v.include?(task) }.keys end
Gets the process id of a task.
@param [Symbol] task_name @return [Integer] the pid of the task or Jongleur::StatusCodes::PROCESS_NOT_YET_RAN if the task hasn't been ran yet
# File lib/jongleur/implementation.rb, line 57 def self.get_process_id(task_name) if valid_tasks?([].push(task_name)) idx = API.task_matrix.index { |t| t.name == task_name } # STDOUT.puts ">>>>> #{task_name} >>>>>> #{API.task_matrix[idx].pid}", "\n" API.task_matrix[idx].pid else StatusCodes::TASK_NOT_IN_TASK_GRAPH end end
# File lib/jongleur/implementation.rb, line 136 def self.get_task_list API.task_matrix.select(&:running) end
Parses a line of program output
@param [String] a line of program output @return [Hash] the output line in a key-value format
# File lib/jongleur/implementation.rb, line 167 def self.parse_line(line) res = {} msg_arr = [] msg_arr = line.split(',') if line&.match(/^finished task/) msg_arr.each do |x| h = {} s = x.split(':') h[s.at(0).strip] = s.at(1).strip res.merge!(h) end res end
Parses a multi-line string of program output
@param [StringIO] the standard output as a string @param [Boolean] print output to stdout @return [Array<Hash>] a list of hashes representing the std output
# File lib/jongleur/implementation.rb, line 186 def self.parse_output(string_io, print_to_stdout = false) parsed = [] string_io.each_line do |line| STDOUT.puts ">>> #{line}" if print_to_stdout line_as_hash = parse_line(line) parsed << line_as_hash unless line_as_hash.empty? end parsed end
# File lib/jongleur/implementation.rb, line 85 def self.predecessors_which_failed(task_name) get_predecessors(task_name).select { |t| task_failed?(t) } end
# File lib/jongleur/implementation.rb, line 89 def self.predecessors_which_havent_finished(task_name) get_predecessors(task_name).reject { |t| task_finished?(t) } end
# File lib/jongleur/implementation.rb, line 214 def self.process_message(a_msg) STDOUT.puts(a_msg) STDOUT.sync end
run all descendant tasks of given task
# File lib/jongleur/implementation.rb, line 197 def self.run_descendants(task_name) each_descendant(task_name) do |t| waiting = predecessors_which_havent_finished(t.name) failed = predecessors_which_failed(t.name) if waiting.empty? && failed.empty? t.running = true Implementation.process_message "starting task #{t.name}" t.pid = fork { API.const_get(t.name).new(predecessors: get_predecessors(t.name)).execute } elsif !failed.empty? process_message "cannot start #{t.name} because its predecessor #{failed.first} failed to finish" elsif !waiting.empty? process_message "cannot start #{t.name} because its predecessor #{waiting.first} hasn't finished yet" end end end
# File lib/jongleur/implementation.rb, line 132 def self.running_tasks API.task_matrix.select(&:running) end
Check if a task has failed status
@return [Boolean, Integer] true if task has a failed status, false if not,
StatusCodes::TASK_NOT_IN_TASK_MATRIX if task not found
# File lib/jongleur/implementation.rb, line 105 def self.task_failed?(task) idx = API.task_matrix.index { |t| t.name == task } idx ? (API.task_matrix[idx].success_status == false) : StatusCodes::TASK_NOT_IN_TASK_MATRIX end
Check if a task has finished running
@return [Boolean, Integer] true if task has finished, false if not,
StatusCodes::TASK_NOT_IN_TASK_MATRIX if task not found
# File lib/jongleur/implementation.rb, line 123 def self.task_finished?(task) idx = API.task_matrix.index { |t| t.name == task } idx ? API.task_matrix[idx].exit_status : StatusCodes::TASK_NOT_IN_TASK_MATRIX end
Check if a task is still tunning, at the time of checking
@return [Boolean, Integer] true if task is running, false if not,
StatusCodes::TASK_NOT_IN_TASK_MATRIX if task not found
# File lib/jongleur/implementation.rb, line 114 def self.task_running?(task) idx = API.task_matrix.index { |t| t.name == task } idx ? API.task_matrix[idx].running : StatusCodes::TASK_NOT_IN_TASK_MATRIX end
Lists all tasks without dependents
@return [Array] a list of all tasks without dependents
# File lib/jongleur/implementation.rb, line 96 def self.tasks_without_predecessors list = API.task_graph.keys - API.task_graph.values.flatten API.task_matrix.select { |t| list.include?(t.name) } end
Ensures a task, or list of tasks, are defined in the task_diagram and are loaded in Ruby. If const_get can't find the class it raises NameError. The method catches it and returns false
@note this method exists for the scenario where the user adds a task X to the Task Diagram but fails to provide an implementation of the Task's class, i.e. class X < WorkerTask
@param [Array<Symbol>] tasks to be validated @return [Boolean] true if all tasks are valid, and false if one task or more are invalid
# File lib/jongleur/implementation.rb, line 45 def self.valid_tasks?(task_list) task_list.each { |task| API.const_get(task.to_s) } true rescue NameError false end