class Orchestrator::Task
Public Class Methods
new(options)
click to toggle source
# File lib/orchestrator/task.rb, line 12 def initialize(options) @mutex = Mutex.new Thread.abort_on_exception = true @log = '' @children = {} @options = options @defaults = {} @defaults[:envs] = Object.new @defaults[:args] = Object.new bail("config file #{config} does not exists") unless File.exist?(@options.config) @settings = YAML.load_file(@options.config) bail("no task job #{@options.name} is defined in settings file") unless @settings['orchestrator'].has_key?(@options.name) bail("no statedir is defined in settings file") if @settings['orchestrator'][@options.name]['save'] && !@settings['orchestrator'].has_key?('statedir') unless @options.statefile if @settings['orchestrator'][@options.name]['save'] @options.statefile = @settings['orchestrator']['statedir'] + "/" + @options.name FileUtils.mkdir_p(@settings['orchestrator']['statedir']) end end if @options.statefile if File.exist?(@options.statefile) File.open(@options.statefile, "r") do |file| @got_lock = file.flock( File::LOCK_NB | File::LOCK_EX ) end unless @got_lock if @options.kill @state = YAML.load_file(@options.statefile) begin Process.kill("TERM", @state['pid']) if @state.has_key?('pid') rescue Errno::ESRCH end sleep 0.1 File.open(@options.statefile, "r") do |file| @got_lock = file.flock( File::LOCK_NB | File::LOCK_EX ) end end unless @got_lock unless @options.wait bail("The state file #{@options.statefile} is already locked by other process") else Formatador.display_line("[yellow]WARN[/]: Blocking until already running process ends") File.open(@options.statefile, "r") do |file| @got_lock = file.flock( File::LOCK_EX ) end end end end end if File.exist?(@options.statefile) if @options.reset @state = @settings['orchestrator'][@options.name] elsif @options.resume @state = YAML.load_file(@options.statefile) else Formatador.display_line("[red]ERROR[/]: statefile #{@options.statefile} already exists, use --resume or --reset") exit 1 end else @state = @settings['orchestrator'][@options.name] end @state['pid'] = Process.pid @statefile_handle = File.open(@options.statefile, "w") @statefile_handle.flock( File::LOCK_NB | File::LOCK_EX ) @statefile_handle.sync = true ### @statefile_handle.close_on_exec = true Signal.trap("TERM") do @children.each do |pid, command| begin Process.kill("TERM", pid) puts "KILLED #{pid}: #{command}\n" if @options.verbose @log += "KILLED #{pid}: #{command}\n" rescue Errno::ESRCH end end puts "KILLED ORCHESTRATOR\n" if @options.verbose @log += "KILLED ORCHESTRATOR\n" fail end else @state = @settings['orchestrator'][@options.name] end @options.email = false unless @state.has_key?('email') @options.email_on_success = (@options.email and @state['email'].has_key?('on_success')) ? @state['email']['on_success'] : true @options.sms = false unless @state.has_key?('sms') @options.sms_on_success = (@options.sms and @state['sms'].has_key?('on_success')) ? @state['sms']['on_success'] : false end
Public Instance Methods
bail(reason)
click to toggle source
# File lib/orchestrator/task.rb, line 111 def bail(reason) Formatador.display_line("[red]ERROR[/]: #{reason}") exit 1 end
fail()
click to toggle source
# File lib/orchestrator/task.rb, line 281 def fail run_script(@failure_handler) if @failure_handler run_script(@state['failure_handler']) if @state.has_key?('failure_handler') Pony.mail( :to => @state['email']['recipients'], :from => @state['email']['from'], :subject => "#{@state['description']} - [FAILED]", :body => @log ) if @options.email Pony.mail( :to => @state['sms']['recipients'], :from => @state['sms']['from'], :subject => "#{@state['description']} - [FAILED]", :body => @state['sms']['auth'] ) if @options.sms exit 1 end
interpolate_command(command,pattern=/^(ENV|ARG|EXEC)\./)
click to toggle source
# File lib/orchestrator/task.rb, line 138 def interpolate_command(command,pattern=/^(ENV|ARG|EXEC)\./) command = command.to_s if command.is_a?(Integer) command.gsub(/:::([^:]*):::/) do match = $1 invalid("interpolation type is not valid in this context - :::#{match}:::") if match !~ pattern and match =~ /^(ENV|ARG|EXEC)\./ case match when /^LOOP\.\d$/ require 'pp' invalid("command interpolation failed not nested loop type step") unless @nested_loop_param i = match.match(/^LOOP\.(\d)$/)[1].to_i @nested_loop_param[i] ? @nested_loop_param[i] : invalid("command interpolation failed unexisting nesting #{match}") when /^LOOP$/ invalid("command interpolation failed not loop type step") unless @loop_param @loop_param when /^ENV\./ env = match["ENV.".length..-1] invalid("command interpolation failed no such env variable - #{env}") if !ENV[env] and !@defaults[:envs].instance_variable_defined?("@#{env}") ENV[env] ? ENV[env] : @defaults[:envs].instance_variable_get("@#{env}") when /^ARG\./ arg = match["ARG.".length..-1] invalid("command interpolation failed no such arg - #{arg}") if !@options.args.instance_variable_defined?("@#{arg}") and !@defaults[:args].instance_variable_defined?("@#{arg}") @options.args.instance_variable_defined?("@#{arg}") ? @options.args.instance_variable_get("@#{arg}") : @defaults[:args].instance_variable_get("@#{arg}") when /^EXEC\./ exec = match["EXEC.".length..-1] result = nil begin result = IO.popen(exec) rescue invalid("command interpolation failed to exec - #{exec}") end invalid("command interpolation exec exit with non zero status - #{exec}") unless $?.to_i == 0 result.readline.delete("\n") else invalid("command interpolation failed not valid parameter - :::#{match}:::") end end end
invalid(reason)
click to toggle source
# File lib/orchestrator/task.rb, line 116 def invalid(reason) Formatador.display_line("[red]ERROR[/]: #{reason}") FileUtils.rm_f(@options.statefile) if @options.statefile exit 1 end
notify()
click to toggle source
# File lib/orchestrator/task.rb, line 302 def notify run_script(@state['ok_handler']) if @state.has_key?('ok_handler') Pony.mail( :to => @state['email']['recipients'], :from => @state['email']['from'], :subject => "#{@state['description']} - [OK]", :body => @log ) if @options.email and @options.email_on_success Pony.mail( :to => @state['sms']['recipients'], :from => @state['sms']['from'], :subject => "#{@state['description']} - [OK]", :body => @state['sms']['auth'] ) if @options.sms and @options.sms_on_success end
run()
click to toggle source
# File lib/orchestrator/task.rb, line 435 def run validate_config save_state @state['steps'].each do |step| @statuses = Array.new @timeout = step.has_key?('timeout') ? step['timeout'].to_i : 0 @retries = step.has_key?('retries') ? step['retries'].to_i : 0 @retry_delay = step.has_key?('retry_delay') ? step['retry_delay'] : 0 @on_week_days = step.has_key?('on_week_days') ? step['on_week_days'].map{|d| "#{d}?".downcase.to_sym} : [ :sunday?, :monday?, :tuesday?, :wednesday?, :thursday?, :friday?, :saturday? ] @on_month_days = step.has_key?('on_month_days') ? step['on_month_days'] : (1..31).to_a @failure_handler = step.has_key?('failure_handler') ? step['failure_handler'] : nil if step['type'].to_sym == :parallel and @on_week_days.map {|d| Time.now.send(d) }.find_index(true) and @on_month_days.find_index(Time.now.mday) #Parallel interval = step.has_key?('sleep') ? step['sleep'] : 1 parallel_spawn_delay = step.has_key?('parallel_spawn_delay') ? step['parallel_spawn_delay'] : 0.1 parallel_factor = step.has_key?('parallel') ? step['parallel'] : 1 @on_failure = step.has_key?('on_failure') ? step['on_failure'].to_sym : :finish @threads = Hash.new invoked = 0 running_threads = 0 step['scripts'].each_index do |index| next if step['scripts'][index].has_key?('status') and ['OK','SKIPED'].find_index(step['scripts'][index]['status']) if step['scripts'][index].has_key?('condition') and 'OK' != run_command(step['scripts'][index]['condition'], 0) step['scripts'][index]['status'] = "SKIPED" save_state next end loop do @mutex.synchronize do running_threads = @threads.length end break if @on_failure == :wait and @statuses.find_index(false) if parallel_factor > running_threads @threads[index] = Thread.new { thread_wrapper(index, step['scripts'][index]) } invoked += 1 sleep parallel_spawn_delay if invoked < parallel_factor break end sleep interval end end loop do @mutex.synchronize do running_threads = @threads.length end break if running_threads == 0 sleep interval end run_script(step['ok_handler']) if step.has_key?('ok_handler') and not @statuses.find_index(false) fail if @on_failure != :ignore and @statuses.find_index(false) elsif step['type'].to_sym == :sequential and @on_week_days.map {|d| Time.now.send(d) }.find_index(true) and @on_month_days.find_index(Time.now.mday) #Sequential @on_failure = step.has_key?('on_failure') ? step['on_failure'].to_sym : :die step['scripts'].each_index do |index| failures = 0 next if step['scripts'][index].has_key?('status') and ['OK','SKIPED'].find_index(step['scripts'][index]['status']) if step['scripts'][index].has_key?('condition') and 'OK' != run_command(step['scripts'][index]['condition'], 0) step['scripts'][index]['status'] = "SKIPED" save_state next end loop do @statuses[index] = run_script(step['scripts'][index]) break if @statuses[index] failures += 1 break if failures > @retries if step['scripts'][index].has_key?('retry_handler') timeout = step['scripts'][index].has_key?('timeout') ? step['scripts'][index]['timeout'].to_i : @timeout run_command(step['scripts'][index]['retry_handler'],timeout) end sleep @retry_delay end run_post_script_handlers(step['scripts'][index],@statuses[index]) fail if not @statuses[index] and @on_failure == :die end run_script(step['ok_handler']) if step.has_key?('ok_handler') and not @statuses.find_index(false) fail if @on_failure != :ignore and @statuses.find_index(false) end end notify if @options.statefile @statefile_handle.close unless @statefile_handle.closed? FileUtils.rm_f(@options.statefile) end end
run_command(command,timeout)
click to toggle source
# File lib/orchestrator/task.rb, line 319 def run_command(command,timeout) result = "" error = "" # start = Time.now status = 'STARTED' child = nil begin Timeout::timeout(timeout) do status = POpen4::popen4(command) do |stdout, stderr, stdin, pid| @children[pid] = command child = pid result = stdout.read.strip error = stderr.read.strip end status = (status.nil? or status.exitstatus != 0) ? 'FAILED' : 'OK' end rescue Timeout::Error status = 'TIMEOUT' if child begin `/bin/ps --ppid #{child} -o pid=`.split(/\n/).each{|p| Process.kill("TERM", p.to_i)} Process.kill("TERM", child) result += "\nKILLED #{child}: #{command}\n" rescue Errno::ESRCH end end end # runtime = Time.now - start # runtime = runtime > 60 ? runtime/60 : runtime @mutex.synchronize do @children.delete(child) if child output = <<-EOF Running: #{command} - #{status} ============ STDOUT ============ #{result} ============ STDERR ============ #{error} ================================ EOF @log += output puts output if @options.verbose end return status end
run_post_script_handlers(script,status)
click to toggle source
# File lib/orchestrator/task.rb, line 372 def run_post_script_handlers(script,status) handler = nil if status handler = 'ok_handler' if script.has_key?('ok_handler') else handler = 'failure_handler' if script.has_key?('failure_handler') end if handler timeout = script.has_key?('timeout') ? script['timeout'].to_i : @timeout run_command(script[handler],timeout) end end
run_script(script)
click to toggle source
# File lib/orchestrator/task.rb, line 385 def run_script(script) timeout = script.has_key?('timeout') ? script['timeout'].to_i : @timeout script['status'] = 'STARTED' save_state script['status'] = run_command(script['command'],timeout) save_state return script['status'] == 'OK' end
save_state()
click to toggle source
# File lib/orchestrator/task.rb, line 122 def save_state if @options.statefile @mutex.synchronize do unless File.exist?(@options.statefile) @statefile_handle.close @statefile_handle = File.open(@options.statefile, "w") @statefile_handle.flock( File::LOCK_NB | File::LOCK_EX ) @statefile_handle.sync = true end @statefile_handle.rewind YAML.dump(@state, @statefile_handle) @statefile_handle.truncate(@statefile_handle.pos) end end end
thread_wrapper(i,script)
click to toggle source
# File lib/orchestrator/task.rb, line 396 def thread_wrapper(i,script) failures = 0 loop do begin @statuses[i] = run_script(script) rescue Exception => e script['status'] = 'EXCEPTION' save_state @statuses[i] = false @mutex.synchronize do output = <<-EOF Thread - (#{script['command']}) Died due to following exception: #{e.inspect} #{e.backtrace} EOF @log += output puts output if @options.verbose end end break if @statuses[i] failures += 1 break if @retries < failures if script.has_key?('retry_handler') timeout = script.has_key?('timeout') ? script['timeout'].to_i : @timeout run_command(script['retry_handler'],timeout) end sleep @retry_delay end @threads.delete(i) run_post_script_handlers(script,@statuses[i]) fail if @on_failure == :die and not @statuses[i] end
validate_command(command,error_prefix)
click to toggle source
# File lib/orchestrator/task.rb, line 177 def validate_command(command,error_prefix) if command.is_a?(String) command = { 'command' => interpolate_command(command) } elsif command.is_a?(Hash) invalid(error_prefix + " is missing command attribute") unless command.has_key?('command') ['command', 'ok_handler', 'failure_handler', 'retry_handler', 'condition'].each do |attribute| if command.has_key?(attribute) invalid(error_prefix + " #{attribute} attribute is invalid") unless command[attribute].is_a?(String) command[attribute] = interpolate_command(command[attribute]) end end else invalid(error_prefix + " has invalid format") end command end
validate_config()
click to toggle source
# File lib/orchestrator/task.rb, line 194 def validate_config @state['ok_handler'] = validate_command(@state['ok_handler'], 'task success handler') if @state.has_key?('ok_handler') @state['failure_handler'] = validate_command(@state['failure_handler'], 'task failure handler') if @state.has_key?('failure_handler') if @state.has_key?('email') invalid("config email recipients is missing or invalid") unless @state['email'].has_key?('recipients') && @state['email']['recipients'].is_a?(String) || @state['email']['recipients'].is_a?(Array) invalid("config email from is missing or invalid") unless @state['email'].has_key?('from') && @state['email']['from'].is_a?(String) @state['email']['from'] = interpolate_command(@state['email']['from']) end if @state.has_key?('sms') invalid("task sms recipients is missing") unless @state['sms'].has_key?('recipients') && @state['sms']['recipients'].is_a?(String) || @state['sms']['recipients'].is_a?(Array) invalid("task sms from is missing") unless @state['sms'].has_key?('from') && @state['sms']['from'].is_a?(String) @state['sms']['from'] = interpolate_command(@state['sms']['from']) end if @state.has_key?('defaults') invalid("defaults must be hash") unless @state['defaults'].is_a?(Hash) [:envs, :args].each do |type| if @state['defaults'].has_key?(type.to_s) invalid("default envs must be hash") unless @state['defaults'][type.to_s].is_a?(Hash) @state['defaults'][type.to_s].each do|key,value| value = '' unless value if not value.is_a?(String) and not value.is_a?(Integer) invalid("default value for #{type} #{key} is invalid") end @defaults[type].instance_variable_set("@#{key}",interpolate_command(value,/^EXEC\./)) end end end end invalid("task description is missing or invalid") unless @state.has_key?('description') && @state['description'].is_a?(String) @state['description'] = interpolate_command(@state['description']) invalid("task save must be boolean") if @state.has_key?('save') && !!@state['save'] != @state['save'] @state['save'] = false unless @state.has_key?('save') invalid("task steps is missing") unless @state.has_key?('steps') invalid("task steps must be array") unless @state['steps'].is_a?(Array) @state['steps'].each do |step| invalid("task step is not hash") unless step.is_a?(Hash) invalid("task step has no type") unless step.has_key?('type') && step['type'].is_a?(String) invalid("task step type #{step['type']} is invalid") unless [:parallel,:sequential].find_index(step['type'].to_sym) invalid("task step scripts is missing or invalid") unless step.has_key?('scripts') && step['scripts'].is_a?(Array) if step.has_key?('loop') invalid("task step loop is not array") unless step['loop'].is_a?(Array) scripts = [] step['loop'].each do |item| @loop_param = item step['scripts'].each_index do |index| script = Marshal.load(Marshal.dump(step['scripts'][index])) scripts << validate_command(script, 'task step looped script') end @loop_param = nil end step['scripts'] = scripts step.delete('loop') elsif step.has_key?('nested_loop') invalid("task step nested loop is not array") unless step['nested_loop'].is_a?(Array) step['nested_loop'].each do |loop| invalid("task step nested loop is not array of arrays") unless loop.is_a?(Array) end scripts = [] loops = step['nested_loop'].inject([[]]) do |tuples, array| tuples.inject([]) do |agg, tuple| array.each do |x| agg << [*tuple, x] end agg end end loops.each do |item| @nested_loop_param = item step['scripts'].each_index do |index| script = Marshal.load(Marshal.dump(step['scripts'][index])) scripts << validate_command(script, 'task step nested loop script') end @nested_loop_param = nil end step['scripts'] = scripts step.delete('nested_loop') else step['scripts'].each_index do |index| step['scripts'][index] = validate_command(step['scripts'][index], 'task step script') end end step['ok_handler'] = validate_command(step['ok_handler'], 'task ok handler') if step.has_key?('ok_handler') step['failure_handler'] = validate_command(step['failure_handler'], 'task failure handler') if step.has_key?('failure_handler') step['retry_handler'] = validate_command(step['retry_handler'], 'task retry handler') if step.has_key?('retry_handler') end end