class OrgConverge::Engine
Constants
- RAINBOW
Attributes
babel[R]
logger[R]
Public Class Methods
new(options={})
click to toggle source
Calls superclass method
# File lib/org-converge/engine.rb, line 20 def initialize(options={}) super(options) @logger = options[:logger] || Logger.new(STDOUT) @babel = options[:babel] @runmode = options[:runmode] # Code blocks whose start invocation is manipulated run inside a thread @threads = [] @running_threads = { } # Returns a list in the end with the exit status code from the code blocks # that were run in parallel @procs_exit_status = { } end
Public Instance Methods
output(name, data)
click to toggle source
# File lib/org-converge/engine.rb, line 97 def output(name, data) data.to_s.lines.map(&:chomp).each do |message| # FIXME: In case the process has finished before its lines where flushed output = "#{name} -- #{message}" ps, pid = name.empty? ? '<defunct>' : name.split('.') output = "#{pad_process_name(ps)}".fg get_color_for_pid(pid.to_i) output += " -- " output += message # FIXME: When the process has stopped already, the name of the process/thread does not appear # (which means that this approach is wrong from the beginning probably) logger.info output end rescue Errno::EPIPE terminate_gracefully end
register(name, command, options={})
click to toggle source
# File lib/org-converge/engine.rb, line 87 def register(name, command, options={}) options[:env] ||= env options[:cwd] ||= File.dirname(command.split(" ").first) options[:babel] ||= @babel process = OrgConverge::CodeBlockProcess.new(command, options) @names[process] = name @processes << process end
spawn_processes()
click to toggle source
Overriden: we do not consider process formations
# File lib/org-converge/engine.rb, line 57 def spawn_processes @processes.each do |process| reader, writer = create_pipe pid = nil thread = nil begin # In case of spec mode, we need to redirect the output to a results file instead writer = File.open(process.options[:results], 'a') if @runmode == 'spec' pid, thread = process.run(:output => writer, :header => process.options[:header]) @names[process] = "#{@names[process]}.#{pid || thread.__id__}" # NOTE: In spec mode we need to be more strict on what is flushed by the engine # because we will be comparing the output unless @runmode == 'spec' writer.puts "started with pid #{pid}" if pid writer.puts "started thread with tid #{thread.__id__}" if thread end rescue Errno::ENOENT writer.puts "unknown command: #{process.command}" unless @runmode == 'spec' end @running[pid] = [process] if pid @readers[pid || thread.__id__] = reader if thread @threads << thread @running_threads[thread.__id__] = [process] end end end
start()
click to toggle source
We allow other processes to exit with 0 status to continue with the runlist
# File lib/org-converge/engine.rb, line 37 def start register_signal_handlers spawn_processes watch_for_output sleep 0.1 begin status = watch_for_termination do @threads.each do |t| unless t.alive? t.exit @running_threads.delete(t.__id__) end end end end while (@running.count > 0 or @running_threads.count > 0) @procs_exit_status end
Private Instance Methods
get_color_for_pid(pid)
click to toggle source
# File lib/org-converge/engine.rb, line 126 def get_color_for_pid(pid) RAINBOW[pid % 7] end
name_for(pid)
click to toggle source
# File lib/org-converge/engine.rb, line 154 def name_for(pid) process = nil index = nil if @running[pid] process, index = @running[pid] elsif @running_threads[pid] process, index = @running_threads[pid] end name_for_index(process, index) end
name_for_index(process, index)
click to toggle source
# File lib/org-converge/engine.rb, line 165 def name_for_index(process, index) [ @names[process], index.to_s ].compact.join(".") end
name_padding()
click to toggle source
# File lib/org-converge/engine.rb, line 115 def name_padding @name_padding ||= begin name_padding = @names.values.map { |n| n.split('.').first.length }.sort.last [ 9, name_padding ].max end end
pad_process_name(name)
click to toggle source
# File lib/org-converge/engine.rb, line 122 def pad_process_name(name) name.ljust(name_padding, " ") end
termination_message_for(status)
click to toggle source
# File lib/org-converge/engine.rb, line 140 def termination_message_for(status) n = name_for(status.pid).split('.').first if status.exited? @procs_exit_status[n] = status.exitstatus "exited with code #{status.exitstatus}" elsif status.signaled? # TODO: How to handle exit by signals? Non-zero exit status so idempotency check fails? "terminated by SIG#{Signal.list.invert[status.termsig]}" else "died a mysterious death" end end
watch_for_termination() { || ... }
click to toggle source
# File lib/org-converge/engine.rb, line 130 def watch_for_termination pid, status = Process.wait2 output_with_mutex name_for(pid), termination_message_for(status) unless @runmode == 'spec' @running.delete(pid) yield if block_given? pid rescue Errno::ECHILD yield if block_given? end