class OodCore::Job::Adapters::PBSPro
An adapter object that describes the communication with a PBS Pro resource manager for job management.
Constants
- STATE_MAP
Mapping of state codes for
PBSPro
Attributes
What percentage of jobs a user owns out of all jobs, used to decide whether we filter the owner's jobs from a `qstat` of all jobs or call `qstat` on each of the owner's individual jobs @return [Float] ratio of owner's jobs to all jobs
Public Class Methods
@api private @param opts [#to_h] the options defining this adapter @option opts [Batch] :pbspro The PBS Pro batch object @option opts [#to_f] :qstat_factor (0.10) The qstat deciding factor @see Factory.build_pbspro
# File lib/ood_core/job/adapters/pbspro.rb, line 219 def initialize(opts = {}) o = opts.to_h.compact.symbolize_keys @pbspro = o.fetch(:pbspro) { raise ArgumentError, "No pbspro object specified. Missing argument: pbspro" } @qstat_factor = o.fetch(:qstat_factor, 0.10).to_f end
Public Instance Methods
Delete the submitted job @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong deleting a job @return [void] @see Adapter#delete
# File lib/ood_core/job/adapters/pbspro.rb, line 414 def delete(id) @pbspro.delete_job(id.to_s) rescue Batch::Error => e # assume successful job deletion if can't find job id raise JobAdapterError, e.message unless /Unknown Job Id/ =~ e.message || /Job has finished/ =~ e.message end
# File lib/ood_core/job/adapters/pbspro.rb, line 421 def directive_prefix '#PBS' end
Put the submitted job on hold @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong holding a job @return [void] @see Adapter#hold
# File lib/ood_core/job/adapters/pbspro.rb, line 390 def hold(id) @pbspro.hold_job(id.to_s) rescue Batch::Error => e # assume successful job hold if can't find job id raise JobAdapterError, e.message unless /Unknown Job Id/ =~ e.message || /Job has finished/ =~ e.message end
Retrieve job info from the resource manager @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong getting job info @return [Info] information describing submitted job @see Adapter#info
# File lib/ood_core/job/adapters/pbspro.rb, line 350 def info(id) id = id.to_s job_infos = @pbspro.get_jobs(id: id).map do |v| parse_job_info(v) end if job_infos.empty? Info.new(id: id, status: :completed) elsif job_infos.length == 1 job_infos.first else process_job_array(id, job_infos) end rescue Batch::Error => e # set completed status if can't find job id if /Unknown Job Id/ =~ e.message || /Job has finished/ =~ e.message Info.new( id: id, status: :completed ) else raise JobAdapterError, e.message end end
Retrieve info for all jobs from the resource manager @raise [JobAdapterError] if something goes wrong getting job info @return [Array<Info>] information describing submitted jobs @see Adapter#info_all
# File lib/ood_core/job/adapters/pbspro.rb, line 305 def info_all(attrs: nil) @pbspro.get_jobs.map do |v| parse_job_info(v) end rescue Batch::Error => e raise JobAdapterError, e.message end
Retrieve info for all jobs for a given owner or owners from the resource manager @param owner [#to_s, Array<#to_s>] the owner(s) of the jobs @raise [JobAdapterError] if something goes wrong getting job info @return [Array<Info>] information describing submitted jobs
OodCore::Job::Adapter#info_where_owner
# File lib/ood_core/job/adapters/pbspro.rb, line 318 def info_where_owner(owner, attrs: nil) owner = Array.wrap(owner).map(&:to_s) usr_jobs = @pbspro.select_jobs(args: ["-u", owner.join(",")]) all_jobs = @pbspro.select_jobs(args: ["-T"]) # `qstat` all jobs if user has too many jobs, otherwise `qstat` each # individual job (default factor is 10%) if usr_jobs.size > (qstat_factor * all_jobs.size) super else begin user_job_infos = [] usr_jobs.each do |id| job = info(id) user_job_infos << job job.tasks.each {|task| user_job_infos << job.build_child_info(task)} end user_job_infos rescue Batch::Error => e raise JobAdapterError, e.message end end end
Release the job that is on hold @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong releasing a job @return [void] @see Adapter#release
# File lib/ood_core/job/adapters/pbspro.rb, line 402 def release(id) @pbspro.release_job(id.to_s) rescue Batch::Error => e # assume successful job release if can't find job id raise JobAdapterError, e.message unless /Unknown Job Id/ =~ e.message || /Job has finished/ =~ e.message end
Retrieve job status from resource manager @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong getting job status @return [Status] status of job @see Adapter#status
# File lib/ood_core/job/adapters/pbspro.rb, line 381 def status(id) info(id.to_s).status end
Submit a job with the attributes defined in the job template instance @param script [Script] script object that describes the script and
attributes for the submitted job
@param after [#to_s, Array<#to_s>] this job may be scheduled for
execution at any point after dependent jobs have started execution
@param afterok [#to_s, Array<#to_s>] this job may be scheduled for
execution only after dependent jobs have terminated with no errors
@param afternotok [#to_s, Array<#to_s>] this job may be scheduled for
execution only after dependent jobs have terminated with errors
@param afterany [#to_s, Array<#to_s>] this job may be scheduled for
execution after dependent jobs have terminated
@raise [JobAdapterError] if something goes wrong submitting a job @return [String] the job id returned after successfully submitting a
job
@see Adapter#submit
# File lib/ood_core/job/adapters/pbspro.rb, line 241 def submit(script, after: [], afterok: [], afternotok: [], afterany: []) after = Array(after).map(&:to_s) afterok = Array(afterok).map(&:to_s) afternotok = Array(afternotok).map(&:to_s) afterany = Array(afterany).map(&:to_s) # Set qsub options args = [] # ignore args, can't use these if submitting from STDIN args.concat ["-h"] if script.submit_as_hold args.concat ["-r", script.rerunnable ? "y" : "n"] unless script.rerunnable.nil? args.concat ["-M", script.email.join(",")] unless script.email.nil? if script.email_on_started && script.email_on_terminated args.concat ["-m", "be"] elsif script.email_on_started args.concat ["-m", "b"] elsif script.email_on_terminated args.concat ["-m", "e"] end args.concat ["-N", script.job_name] unless script.job_name.nil? args.concat ["-S", script.shell_path] unless script.shell_path.nil? # ignore input_path (not defined in PBS Pro) args.concat ["-o", script.output_path] unless script.output_path.nil? args.concat ["-e", script.error_path] unless script.error_path.nil? # Reservations are actually just queues in PBS Pro args.concat ["-q", script.reservation_id] if !script.reservation_id.nil? && script.queue_name.nil? args.concat ["-q", script.queue_name] unless script.queue_name.nil? args.concat ["-p", script.priority] unless script.priority.nil? args.concat ["-a", script.start_time.localtime.strftime("%C%y%m%d%H%M.%S")] unless script.start_time.nil? args.concat ["-A", script.accounting_id] unless script.accounting_id.nil? args.concat ["-l", "walltime=#{seconds_to_duration(script.wall_time)}"] unless script.wall_time.nil? # Set dependencies depend = [] depend << "after:#{after.join(":")}" unless after.empty? depend << "afterok:#{afterok.join(":")}" unless afterok.empty? depend << "afternotok:#{afternotok.join(":")}" unless afternotok.empty? depend << "afterany:#{afterany.join(":")}" unless afterany.empty? args.concat ["-W", "depend=#{depend.join(",")}"] unless depend.empty? # Set environment variables envvars = script.job_environment.to_h args.concat ["-v", envvars.map{|k,v| "#{k}=#{v}"}.join(",")] unless envvars.empty? args.concat ["-V"] if script.copy_environment? # If error_path is not specified we join stdout & stderr (as this # mimics what the other resource managers do) args.concat ["-j", "oe"] if script.error_path.nil? args.concat ["-J", script.job_array_request] unless script.job_array_request.nil? # Set native options args.concat script.native if script.native # Submit job @pbspro.submit_string(script.content, args: args, chdir: script.workdir) rescue Batch::Error => e raise JobAdapterError, e.message end
Private Instance Methods
Convert duration to seconds
# File lib/ood_core/job/adapters/pbspro.rb, line 427 def duration_in_seconds(time) time.nil? ? nil : time.split(':').map { |v| v.to_i }.inject(0) { |total, v| total * 60 + v } end
Determine state from PBS Pro state code
# File lib/ood_core/job/adapters/pbspro.rb, line 450 def get_state(st) STATE_MAP.fetch(st, :undetermined) end
Parse hash describing PBS Pro job status
# File lib/ood_core/job/adapters/pbspro.rb, line 455 def parse_job_info(v) /^(?<job_owner>[\w-]+)@(?<submit_host>.+)$/ =~ v[:Job_Owner] allocated_nodes = parse_nodes(v[:exec_host] || "") procs = allocated_nodes.inject(0) { |sum, x| sum + x[:procs] } if allocated_nodes.empty? # fill in with requested resources allocated_nodes = [ { name: nil } ] * v.fetch(:Resource_List, {})[:nodect].to_i procs = v.fetch(:Resource_List, {})[:ncpus].to_i end Info.new( id: v[:job_id], status: get_state(v[:job_state]), allocated_nodes: allocated_nodes, submit_host: submit_host, job_name: v[:Job_Name], job_owner: job_owner, accounting_id: v[:Account_Name], procs: procs, queue_name: v[:queue], wallclock_time: duration_in_seconds(v.fetch(:resources_used, {})[:walltime]), wallclock_limit: duration_in_seconds(v.fetch(:Resource_List, {})[:walltime]), cpu_time: duration_in_seconds(v.fetch(:resources_used, {})[:cput]), submission_time: v[:ctime] ? Time.parse(v[:ctime]) : nil, dispatch_time: v[:stime] ? Time.parse(v[:stime]) : nil, native: v ) end
Convert host list string to individual nodes “hosta/J1+hostb/J2*P+…” where J1 and J2 are an index of the job on the named host and P is the number of processors allocated from that host to this job. P does not appear if it is 1. Example: “i5n14/2*7” uses 7 procs on node “i5n14”
# File lib/ood_core/job/adapters/pbspro.rb, line 441 def parse_nodes(node_list) node_list.split('+').map do |n| name, procs_list = n.split('/') procs = (procs_list.split('*')[1] || 1).to_i {name: name, procs: procs} end end
Combine the array parent with the states of its children
# File lib/ood_core/job/adapters/pbspro.rb, line 483 def process_job_array(id, jobs) parent_job = jobs.select { |j| /\[\]/ =~ j.id }.first parent = (parent_job) ? parent_job.to_h : {:id => id, :status => :undetermined} # create task hashes from children parent[:tasks] = jobs.reject { |j| /\[\]/ =~ j.id }.map do |j| { :id => j.id, :status => j.status.to_sym, :wallclock_time => j.wallclock_time } end Info.new(**parent) end
Convert seconds to duration
# File lib/ood_core/job/adapters/pbspro.rb, line 432 def seconds_to_duration(time) "%02d:%02d:%02d" % [time/3600, time/60%60, time%60] end