class QueueManager

Public Class Methods

available?() click to toggle source
# File lib/autoflow/queue_manager.rb, line 322
def self.available?
        return FALSE
end
descendants() click to toggle source

SELECT AND PREPARE MANAGER

# File lib/autoflow/queue_manager.rb, line 25
def self.descendants
        ObjectSpace.each_object(Class).select { |klass| klass < self }
end
new(exec_folder, options, commands, persist_variables) click to toggle source
# File lib/autoflow/queue_manager.rb, line 5
def initialize(exec_folder, options, commands, persist_variables)
        @exec_folder = exec_folder
        @commands = commands
        @persist_variables = persist_variables
        @verbose = options[:verbose]
        @show_submit = options[:show_submit_command]
        @job_identifier = options[:identifier]
        @files = {}
        @remote = options[:remote]
        @ssh = options[:ssh]
        @write_sh = options[:write_sh]
        @external_dependencies = options[:external_dependencies]
        @active_jobs = []
        @extended_logging = options[:extended_logging]
end
priority() click to toggle source
# File lib/autoflow/queue_manager.rb, line 326
def self.priority
        return -1
end
select_manager(options) click to toggle source
# File lib/autoflow/queue_manager.rb, line 42
def self.select_manager(options)
        queue_manager = nil
        priority = 0
        descendants.each do |descendant|
                if descendant.available?(options) && priority <= descendant.priority
                        queue_manager = descendant
                        priority = descendant.priority
                end
        end
        return queue_manager
end
select_queue_manager(exec_folder, options, jobs, persist_variables) click to toggle source
# File lib/autoflow/queue_manager.rb, line 29
def self.select_queue_manager(exec_folder, options, jobs, persist_variables)
        path_managers = File.join(File.dirname(__FILE__),'queue_managers')
        Dir.glob(path_managers+'/*').each do |manager|
                require manager
        end
        if options[:batch]
                queue_manager = BashManager
        else
                queue_manager = select_manager(options)
        end
        return queue_manager.new(exec_folder, options, jobs, persist_variables)
end
system_call(cmd, path = nil, remote = FALSE, ssh = nil) click to toggle source
# File lib/autoflow/queue_manager.rb, line 259
def self.system_call(cmd, path = nil, remote = FALSE, ssh = nil)
        cmd = "cd #{path}; " + cmd if !path.nil?
        if remote
                call = ssh.exec!(cmd)
        else
                call = %x[#{cmd}] 
        end
        return call
end

Public Instance Methods

asign_queue_id(ar_jobs, id) click to toggle source
# File lib/autoflow/queue_manager.rb, line 286
def asign_queue_id(ar_jobs, id)
        ar_jobs.each do |id_job, job|
                job.queue_id=id
        end
end
close_file(file_name, permissions = nil) click to toggle source
# File lib/autoflow/queue_manager.rb, line 224
def close_file(file_name, permissions = nil) #SSH
        path, content = @files.delete(file_name)
        file_path = File.join(path, file_name)
        if @remote
                @ssh.exec!("echo '#{content}' > #{file_path}")
                @ssh.exec!("chmod #{permissions} #{file_path}") if !permissions.nil?
        else
                local_file = File.open(file_path,'w')
                local_file.chmod(permissions) if !permissions.nil?
                local_file.print content
                local_file.close
        end
end
create_file(file_name, path) click to toggle source
# File lib/autoflow/queue_manager.rb, line 216
def create_file(file_name, path) 
        @files[file_name] = [path, '']
end
create_folder(folder_name) click to toggle source
# File lib/autoflow/queue_manager.rb, line 208
def create_folder(folder_name)
        if @remote
                @ssh.exec!("if ! [ -d  #{folder_name} ]; then mkdir -p #{folder_name}; fi")
        else
                Dir.mkdir(folder_name) if !File.exists?(folder_name)
        end
end
exec() click to toggle source

EXECUTING WORKFLOW WITH MANAGER

# File lib/autoflow/queue_manager.rb, line 58
def exec
        create_folder(@exec_folder)
        make_environment_file if !@persist_variables.empty?
        create_file('versions', @exec_folder)
        write_file('versions',"autoflow\t#{Autoflow::VERSION}")
        close_file('versions')
        create_file('index_execution', @exec_folder)
        launch_all_jobs
        close_file('index_execution')
end
get_all_deps(ar_dependencies) click to toggle source
# File lib/autoflow/queue_manager.rb, line 300
def get_all_deps(ar_dependencies)
        final_dep = []
        final_dep.concat(get_queue_system_dependencies(ar_dependencies)) if !ar_dependencies.empty?
        final_dep.concat(@external_dependencies)
        return final_dep
end
get_dependencies(job, id = nil) click to toggle source
# File lib/autoflow/queue_manager.rb, line 279
def get_dependencies(job, id = nil)
        ar_dependencies = []
        ar_dependencies += job.dependencies
        ar_dependencies.delete(id) if !id.nil? #Delete autodependency
        return ar_dependencies
end
get_queue_system_dependencies(ar_dependencies) click to toggle source
# File lib/autoflow/queue_manager.rb, line 292
def get_queue_system_dependencies(ar_dependencies)
        queue_system_ids=[]
        ar_dependencies.each do |dependency|
                queue_system_ids << @commands[dependency].queue_id
        end
        return queue_system_ids
end
get_queue_system_id(shell_output) click to toggle source
# File lib/autoflow/queue_manager.rb, line 318
def get_queue_system_id(shell_output)

end
get_relations_and_folders() click to toggle source
# File lib/autoflow/queue_manager.rb, line 89
def get_relations_and_folders
        relations = {}
        @commands.each do |name, job|
                relations[name] = [job.attrib[:exec_folder], job.dependencies]
        end
        return relations
end
init_log() click to toggle source
# File lib/autoflow/queue_manager.rb, line 69
def init_log #TODO adapt to remote execution
        log_path = [@exec_folder, '.wf_log'].join('/') #Join must assume linux systems so File.join canot be used for windows hosts
        log = parse_log(log_path) #TODO modify to folder
        job_relations_with_folders = get_relations_and_folders
        if @write_sh
                create_file('wf.json', @exec_folder)
                write_file('wf.json', job_relations_with_folders.to_json)
                close_file('wf.json')
        end
        @active_jobs.each do |task|
                query = log[task]
                if query.nil?
                        log[task] = {'set' => [Time.now.to_i]}
                else
                        log[task]['set'] << Time.now.to_i
                end
        end
        write_log(log, log_path, job_relations_with_folders)
end
launch2queue_system(job, id, buffered_jobs) click to toggle source
# File lib/autoflow/queue_manager.rb, line 154
def launch2queue_system(job, id, buffered_jobs)
        sh_name = job.name+'.sh'
        if @write_sh
                # Write sh file
                #--------------------------------
                create_file(sh_name, job.attrib[:exec_folder])
                write_file(sh_name, '#!/usr/bin/env bash')
                write_file(sh_name, '##JOB_GROUP_ID='+@job_identifier)
                write_header(id, job, sh_name)
        end

        #Get dependencies
        #------------------------------------
        ar_dependencies = get_dependencies(job, id)
        buffered_jobs.each do |id_buff_job, buff_job|
                ar_dependencies += get_dependencies(buff_job, id_buff_job)
                if @write_sh
                        write_job(buff_job, sh_name)
                        buff_job.attrib[:exec_folder] = job.attrib[:exec_folder]
                end
        end
        ar_dependencies.uniq!

        if @write_sh
                #Write sh body
                #--------------------------------
                write_file(sh_name, 'hostname')
                log_file_path = [@exec_folder, '.wf_log', File.basename(job.attrib[:exec_folder])].join('/')
                write_file(sh_name, "flow_logger -e #{log_file_path} -s #{job.name}")
                write_file(sh_name, "source #{File.join(@exec_folder, 'env_file')}") if !@persist_variables.empty?
                write_job(job, sh_name)
                write_file(sh_name, "flow_logger -e #{log_file_path} -f #{job.name}")
                write_file(sh_name, "echo 'General time'")
                write_file(sh_name, "times")
                close_file(sh_name, 0755)
        end

        #Submit node
        #-----------------------------------
        if !@verbose
                queue_id = submit_job(job, ar_dependencies)
                job.queue_id = queue_id # Returns id of running tag on queue system
                asign_queue_id(buffered_jobs, queue_id)
        end
end
launch_all_jobs() click to toggle source
# File lib/autoflow/queue_manager.rb, line 97
def launch_all_jobs
        buffered_jobs = []
        sorted_jobs = sort_jobs_by_dependencies
        sorted_jobs.each do |name, job|
                @active_jobs << job.name if !job.attrib[:done]
        end
        init_log
        sorted_jobs.each do |name, job|
                write_file('index_execution', "#{name}\t#{job.attrib[:exec_folder]}") 
                if job.attrib[:done]
                        next
                else
                        rm_done_dependencies(job)
                end  
                buffered_jobs = launch_job_in_folder(job, name, buffered_jobs)
        end
end
launch_job_in_folder(job, id, buffered_jobs) click to toggle source
# File lib/autoflow/queue_manager.rb, line 142
def launch_job_in_folder(job, id, buffered_jobs)
        create_folder(job.attrib[:exec_folder])
        if !job.attrib[:buffer]  # Launch with queue_system the job and all buffered jobs
                launch2queue_system(job, id, buffered_jobs)
                buffered_jobs = []#Clean buffer
        else # Buffer job
                buffered_jobs << [id, job]
        end
        return buffered_jobs  
end
make_environment_file() click to toggle source
# File lib/autoflow/queue_manager.rb, line 200
def make_environment_file
        create_file('env_file', @exec_folder)
        @persist_variables.each do |var, value|
                write_file('env_file', "export #{var}=#{value}")
        end
        close_file('env_file')
end
read_file(file_path) click to toggle source
# File lib/autoflow/queue_manager.rb, line 238
def read_file(file_path)
        content = nil
        if @remote
                res = @ssh.exec!("[ ! -f #{file_path} ] && echo 'Autoflow:File Not Found' || cat #{file_path}")
                content = res if !content.include?('Autoflow:File Not Found')
        else
                content = File.open(file_path).read if File.exists?(file_path)
        end
        return content
end
rm_done_dependencies(job) click to toggle source
# File lib/autoflow/queue_manager.rb, line 132
def rm_done_dependencies(job)
        remove=[]
        job.dependencies.each do |dependency|                 
                remove << dependency if @commands[dependency].attrib[:done]
        end
        remove.each do |rm|
                job.dependencies.delete(rm)
        end
end
sort_jobs_by_dependencies() click to toggle source
# File lib/autoflow/queue_manager.rb, line 115
def sort_jobs_by_dependencies # We need job ids from queue system so we ask for each job and we give the previous queue system ids as dependencies if necessary
        ar_jobs = @commands.to_a
        sorted_jobs = []
        jobs_without_dep = ar_jobs.select{|job| job.last.dependencies.empty?}
        sorted_jobs.concat(jobs_without_dep)
        while ar_jobs.length != sorted_jobs.length
                ids = sorted_jobs.map{|job| job.first}
                ar_jobs.each do |job|
                        if !sorted_jobs.include?(job) 
                                deps = job.last.dependencies - ids
                                sorted_jobs << job if deps.empty?
                        end
                end
        end
        return sorted_jobs
end
submit_job(job, ar_dependencies) click to toggle source
# File lib/autoflow/queue_manager.rb, line 314
def submit_job(job, ar_dependencies)

end
system_call(cmd, path = nil) click to toggle source
# File lib/autoflow/queue_manager.rb, line 249
def system_call(cmd, path = nil)
        cmd = "cd #{path}; " + cmd if !path.nil?
        if @remote
                call = @ssh.exec!(cmd)
        else
                call = %x[#{cmd}] 
        end
        return call
end
write_file(file_name, content) click to toggle source
# File lib/autoflow/queue_manager.rb, line 220
def write_file(file_name, content)
        @files[file_name].last << content+"\n"
end
write_header(id, node, sh) click to toggle source

QUEUE DEPENDANT METHODS

# File lib/autoflow/queue_manager.rb, line 310
def write_header(id, node, sh)

end
write_job(job, sh_name) click to toggle source
# File lib/autoflow/queue_manager.rb, line 269
def write_job(job, sh_name)    
        write_file(sh_name, job.initialization) if !job.initialization.nil?
        if @extended_logging
                log_command = '/usr/bin/time -o process_data -v '
        else
                log_command = 'time '
        end
        write_file(sh_name, log_command + job.parameters)
end