class WorkflowManager::Server
Public Class Methods
config()
click to toggle source
# File lib/workflow_manager/server.rb, line 64 def self.config @@config end
config=(config)
click to toggle source
# File lib/workflow_manager/server.rb, line 61 def self.config=(config) @@config = config end
configure() { |config| ... }
click to toggle source
# File lib/workflow_manager/server.rb, line 70 def self.configure @@config = Config.new # default values @@config.log_dir = LOG_DIR @@config.db_dir = DB_DIR @@config.interval = INTERVAL # interval to check jobs, [s] @@config.resubmit = RESUBMIT # how many times at maximum to resubmit when job fails yield(@@config) if @@config.cluster @@config.cluster.log_dir = File.expand_path(@@config.log_dir) end @@config end
new()
click to toggle source
# File lib/workflow_manager/server.rb, line 130 def initialize @interval = config.interval @resubmit = config.resubmit extension = case DB_MODE when "PStore" '.pstore' when "KyotoCabinet" '.kch' when "Redis" @redis_conf = config.redis_conf '.rdb' end @db_stat = File.join(config.db_dir, 'statuses'+extension) @db_logs = File.join(config.db_dir, 'logs'+extension) @log_dir = File.expand_path(config.log_dir) @db_dir = File.expand_path(config.db_dir) FileUtils.mkdir_p @log_dir unless File.exist?(@log_dir) FileUtils.mkdir_p @db_dir unless File.exist?(@db_dir) @statuses = case DB_MODE when "PStore" PStoreDB.new(@db_stat) when "KyotoCabinet" KyotoDB.new(@db_stat) when "Redis" RedisDB.new(0, @redis_conf) end @logs = case DB_MODE when "PStore" PStoreDB.new(@db_logs) when "KyotoCabinet" KyotoDB.new(@db_logs) when "Redis" RedisDB.new(1, @redis_conf) end @jobs = RedisDB.new(2, @redis_conf) @system_log = File.join(@log_dir, "system.log") @mutex = Mutex.new @cluster = config.cluster puts("DB = #{DB_MODE}") if DB_MODE == "Redis" puts("Redis conf = #{config.redis_conf}") puts("Redis port = #{@logs.port}") end puts("Cluster = #{@cluster.name}") log_puts("DB = #{DB_MODE}") log_puts("Cluster = #{@cluster.name}") log_puts("Server starts") log_puts("Recovery check") sleep 2 recovery_job_checker end
Public Instance Methods
cluster_node_list()
click to toggle source
# File lib/workflow_manager/server.rb, line 555 def cluster_node_list @cluster.node_list end
cluster_nodes()
click to toggle source
# File lib/workflow_manager/server.rb, line 213 def cluster_nodes @cluster.cluster_nodes end
config()
click to toggle source
# File lib/workflow_manager/server.rb, line 67 def config @@config ||= WorkflowManager.configure{} end
copy_commands(org_dir, dest_parent_dir, now=nil)
click to toggle source
# File lib/workflow_manager/server.rb, line 203 def copy_commands(org_dir, dest_parent_dir, now=nil) @cluster.copy_commands(org_dir, dest_parent_dir, now) end
default_node()
click to toggle source
# File lib/workflow_manager/server.rb, line 216 def default_node @cluster.default_node.to_s end
delete_command(target)
click to toggle source
# File lib/workflow_manager/server.rb, line 210 def delete_command(target) @cluster.delete_command(target) end
finalize_monitoring(current_status, log_file, log_dir)
click to toggle source
# File lib/workflow_manager/server.rb, line 283 def finalize_monitoring(current_status, log_file, log_dir) if current_status == 'success' or current_status == 'fail' unless log_dir.empty? copy_commands(log_file, log_dir).each do |command| log_puts(command) system command end err_file = log_file.gsub('_o.log','_e.log') copy_commands(err_file, log_dir).each do |command| log_puts(command) system command end end Thread.current.kill end end
get_log(job_id, with_err=false)
click to toggle source
# File lib/workflow_manager/server.rb, line 500 def get_log(job_id, with_err=false) log_file = nil @logs.transaction do |logs| log_file = logs[job_id.to_s] end log_data = if log_file and File.exist?(log_file) "__STDOUT LOG__\n\n" + File.read(log_file) else 'no log file' end if with_err err_file = log_file.gsub(/_o\.log/,'_e.log') if err_file and File.exist?(err_file) log_data << "\n\n__STDERR LOG__\n\n" log_data << File.read(err_file) end end log_data end
get_script(job_id)
click to toggle source
# File lib/workflow_manager/server.rb, line 519 def get_script(job_id) script_file = nil @logs.transaction do |logs| script_file = logs[job_id.to_s] end if script_file script_file = script_file.gsub(/_o\.log/,'') end script = if script_file and File.exist?(script_file) File.read(script_file) else 'no script file' end script end
get_script_path(job_id)
click to toggle source
# File lib/workflow_manager/server.rb, line 534 def get_script_path(job_id) script_file = nil @logs.transaction do |logs| script_file = logs[job_id.to_s] end script_path = if script_file and File.exist?(script_file) script_file.gsub(/_o\.log/,'') end end
hello()
click to toggle source
# File lib/workflow_manager/server.rb, line 200 def hello 'hello hoge hoge bar boo bundle, '+ @cluster.name end
input_dataset_exist?(file_list)
click to toggle source
# File lib/workflow_manager/server.rb, line 251 def input_dataset_exist?(file_list) flag = true file_list.each do |file| unless File.exist?(file) flag = false break end end flag end
input_dataset_file_list(dataset_tsv_path)
click to toggle source
# File lib/workflow_manager/server.rb, line 240 def input_dataset_file_list(dataset_tsv_path) file_list = [] CSV.foreach(dataset_tsv_path, :headers=>true, :col_sep=>"\t") do |row| row.each do |header, value| if header =~ /\[File\]/ file_list << value end end end file_list end
input_dataset_tsv_path(script_content)
click to toggle source
# File lib/workflow_manager/server.rb, line 227 def input_dataset_tsv_path(script_content) gstore_dir = nil input_dataset_path = nil script_content.split(/\n/).each do |line| if line =~ /GSTORE_DIR=(.+)/ gstore_dir = $1.chomp elsif line =~ /INPUT_DATASET=(.+)/ input_dataset_path = $1.chomp break end end [gstore_dir, input_dataset_path] end
job_list(with_results=false, project_number=nil, job_ids:nil)
click to toggle source
# File lib/workflow_manager/server.rb, line 471 def job_list(with_results=false, project_number=nil, job_ids:nil) s = [] job_idsh = if job_ids Hash[*(job_ids.split(',')).map{|job_id| [job_id, true]}.flatten] end if project_number s_ = {} @jobs.transaction do |jobs| if project_jobs = jobs[project_number] s_ = Hash[*eval(project_jobs)] end end @statuses.transaction do |statuses| s_.each do |job_id, stat| s << [job_id, statuses[job_id]] end end else @statuses.transaction do |statuses| statuses.each do |key, value| s << [key, value] end end end if job_ids s = s.select{|job_id, stat| job_idsh[job_id]} end s.sort_by{|key, value| value.split(',')[2]}.reverse.map{|v| v.join(',')}.join("\n") end
kill_job(job_id)
click to toggle source
# File lib/workflow_manager/server.rb, line 206 def kill_job(job_id) status(job_id, 'FAIL') status = `#{@cluster.kill_command(job_id)}` end
log_puts(str)
click to toggle source
# File lib/workflow_manager/server.rb, line 219 def log_puts(str) time = Time.now.strftime("[%Y.%m.%d %H:%M:%S]") @mutex.synchronize do open(@system_log, "a") do |out| out.print time + " " + str + "\n" end end end
recovery_job_checker()
click to toggle source
# File lib/workflow_manager/server.rb, line 183 def recovery_job_checker @logs.transaction do |logs| @statuses.transaction do |statuses| statuses.each do |job_id, status| # puts [job_id, status].join(",") # 120249,RUNNING,QC_ventricles_100k.sh,2021-07-30 09:47:04/2021-07-30 09:47:04,masaomi,1535 stat, script_basename, time, user, project_number, next_dataset_id = status.split(",") if stat == "RUNNING" or stat == "PENDING" log_file = logs[job_id] log_puts("JobID (in recovery check): #{job_id}") puts "JobID (in recovery check): #{job_id}" JobChecker.perform_async(job_id, script_basename, log_file, user, project_number, next_dataset_id) end end end end end
start_monitoring(submit_command, user = 'sushi lover', resubmit = 0, script = '', project_number = 0, sge_options='', log_dir = '')
click to toggle source
# File lib/workflow_manager/server.rb, line 360 def start_monitoring(submit_command, user = 'sushi lover', resubmit = 0, script = '', project_number = 0, sge_options='', log_dir = '') log_puts("monitoring: script=" + submit_command + " user=" + user + " resubmit=" + resubmit.to_s + " project=" + project_number.to_s + " sge option=" + sge_options + " log dir=" + log_dir.to_s) #warn submit_command # # TODO: analyze arguments # job_id, log_file, command = @cluster.submit_job(submit_command, script, sge_options) log_puts("submit: " + job_id + " " + command) # # monitor worker # if job_id and log_file monitor_worker = Thread.new(job_id, log_file, submit_command, user, resubmit, script, project_number, sge_options, log_dir) do |t_job_id, t_log_file, t_submit_command, t_user, t_resubmit, t_script, t_project_number, t_sge_options, t_log_dir| loop do status = success_or_fail(t_job_id, t_log_file) script_name = File.basename(submit_command).split(/-/).first #@statuses.open(@db_stat) @statuses.transaction do |statuses| #start_time = if stat = @statuses[t_job_id] and stat = stat.split(/,/) and time = stat[2] start_time = if stat = statuses[t_job_id] and stat = stat.split(/,/) and time = stat[2] time end time = if start_time if status == 'success' or status == 'fail' start_time + '/' + Time.now.strftime("%Y-%m-%d %H:%M:%S") else start_time end else Time.now.strftime("%Y-%m-%d %H:%M:%S") end #@statuses[t_job_id] = [status, script_name, time, user, project_number].join(',') statuses[t_job_id] = [status, script_name, time, user, project_number].join(',') #@statuses.close end @logs.transaction do |logs| logs[t_job_id] = t_log_file end #warn t_job_id + " " + status if status == 'success' log_puts(status + ": " + t_job_id) unless t_log_dir.empty? copy_commands(t_log_file, t_log_dir).each do |command| log_puts(command) system command end err_file = t_log_file.gsub('_o.log','_e.log') copy_commands(err_file, t_log_dir).each do |command| log_puts(command) system command end end Thread.current.kill elsif status == 'fail' log_puts(status + ": " + t_job_id) # # TODO: re-submit # if t_resubmit < RESUBMIT log_puts("resubmit: " + t_job_id) resubmit_job_id = start_monitoring(t_submit_command, t_user, t_resubmit + 1, t_script, t_project_number, t_sge_options) script_name = File.basename(submit_command).split(/-/).first #@statuses.open(@db_stat) @statuses.transaction do |statuses| statuses[t_job_id] = ["resubmit: " + resubmit_job_id.to_s, script_name, Time.now.strftime("%Y-%m-%d %H:%M:%S"), t_user, t_project_number].join(',') #@statuses.close end else log_puts("fail: " + t_job_id) end unless t_log_dir.empty? copy_commands(t_log_file, t_log_dir).each do |command| log_puts(command) system command end err_file = t_log_file.gsub('_o.log','_e.log') copy_commands(err_file, t_log_dir).each do |command| log_puts(command) system command end end Thread.current.kill end sleep @interval end end job_id.to_i end end
start_monitoring2(script_path, script_content, user='sushi_lover', project_number=0, sge_options='', log_dir='')
click to toggle source
# File lib/workflow_manager/server.rb, line 310 def start_monitoring2(script_path, script_content, user='sushi_lover', project_number=0, sge_options='', log_dir='') # script_path is only used to generate a log file name # It is not used to read the script contents go_submit = false waiting_time = 0 gstore_dir, input_dataset_path = input_dataset_tsv_path(script_content) if gstore_dir and input_dataset_path waiting_max = 60*60*8 # 8h # wait until the files come until waiting_time > waiting_max or File.exist?(input_dataset_path) and file_list = input_dataset_file_list(input_dataset_path) and file_list.map!{|file| File.join(gstore_dir, file)} and go_submit = input_dataset_exist?(file_list) sleep @interval waiting_time += @interval end end job_id, log_file, command = if go_submit @cluster.submit_job(script_path, script_content, sge_options) else raise "stop submitting #{File.basename(script_path)}, since waiting_time #{waiting_time} > #{waiting_max}" end if job_id and log_file # save log_file in logs @logs.transaction do |logs| logs[job_id] = log_file end # job status check until it finishes with success or fail worker = Thread.new(log_dir, script_path, script_content, sge_options) do |log_dir, script_path, script_content, sge_options| loop do # check status current_status = check_status(job_id, log_file) # save time and status update_time_status(job_id, current_status, script_path, user, project_number) # finalize (kill current thred) in case of success or fail finalize_monitoring(current_status, log_file, log_dir) # wait sleep @interval end # loop end job_id end end
start_monitoring3(script_path, script_content, user='sushi_lover', project_number=0, sge_options='', log_dir='', next_dataset_id='')
click to toggle source
# File lib/workflow_manager/server.rb, line 299 def start_monitoring3(script_path, script_content, user='sushi_lover', project_number=0, sge_options='', log_dir='', next_dataset_id='') script_basename = File.basename(script_path) job_id, log_file, command = @cluster.submit_job(script_path, script_content, sge_options) #p command #p log_file #p job_id puts "JobID (in WorkflowManager): #{job_id}" sleep 1 JobChecker.perform_async(job_id, script_basename, log_file, user, project_number, next_dataset_id) job_id end
status(job_id, new_status=nil)
click to toggle source
# File lib/workflow_manager/server.rb, line 451 def status(job_id, new_status=nil) stat = nil #@statuses.open(@db_stat) @statuses.transaction do |statuses| if new_status and stat = statuses[job_id.to_s] status_list = ['CONPLETED', 'RUNNING', 'PENDING', 'FAIL'] if status_list.include?(new_status) items = stat.split(/,/) items.shift items.unshift(new_status) stat = items.join(',') statuses[job_id.to_s] = stat end else stat = statuses[job_id.to_s] end end #@statuses.close stat end
success_or_fail(job_id, log_file)
click to toggle source
# File lib/workflow_manager/server.rb, line 543 def success_or_fail(job_id, log_file) msg = if @cluster.job_running?(job_id) 'running' elsif @cluster.job_ends?(log_file) 'success' elsif @cluster.job_pending?(job_id) 'pending' else 'fail' end end
Also aliased as: check_status
update_time_status(job_id, current_status, script_name, user, project_number)
click to toggle source
# File lib/workflow_manager/server.rb, line 261 def update_time_status(job_id, current_status, script_name, user, project_number) # if the current status changed from last time, then save, otherwise do nothing # once status changes into success or fail, then the thread is expected to be killed in later process @statuses.transaction do |statuses| start_time = nil if stat = statuses[job_id] last_status, script_name, start_time, user, project_number = stat.split(/,/) end time = if start_time if current_status == 'success' or current_status == 'fail' start_time + '/' + Time.now.strftime("%Y-%m-%d %H:%M:%S") elsif current_status != last_status Time.now.strftime("%Y-%m-%d %H:%M:%S") end else Time.now.strftime("%Y-%m-%d %H:%M:%S") end if time statuses[job_id] = [current_status, script_name, time, user, project_number].join(',') end end end