class OodCore::Job::Adapters::PBSPro

An adapter object that describes the communication with a PBS Pro resource manager for job management.

Constants

STATE_MAP

Mapping of state codes for PBSPro

Attributes

qstat_factor[R]

What percentage of jobs a user owns out of all jobs, used to decide whether we filter the owner's jobs from a `qstat` of all jobs or call `qstat` on each of the owner's individual jobs @return [Float] ratio of owner's jobs to all jobs

Public Class Methods

new(opts = {}) click to toggle source

@api private @param opts [#to_h] the options defining this adapter @option opts [Batch] :pbspro The PBS Pro batch object @option opts [#to_f] :qstat_factor (0.10) The qstat deciding factor @see Factory.build_pbspro

# File lib/ood_core/job/adapters/pbspro.rb, line 219
def initialize(opts = {})
  o = opts.to_h.compact.symbolize_keys

  @pbspro = o.fetch(:pbspro) { raise ArgumentError, "No pbspro object specified. Missing argument: pbspro" }
  @qstat_factor = o.fetch(:qstat_factor, 0.10).to_f
end

Public Instance Methods

delete(id) click to toggle source

Delete the submitted job @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong deleting a job @return [void] @see Adapter#delete

# File lib/ood_core/job/adapters/pbspro.rb, line 414
def delete(id)
  @pbspro.delete_job(id.to_s)
rescue Batch::Error => e
  # assume successful job deletion if can't find job id
  raise JobAdapterError, e.message unless /Unknown Job Id/ =~ e.message || /Job has finished/ =~ e.message
end
directive_prefix() click to toggle source
# File lib/ood_core/job/adapters/pbspro.rb, line 421
def directive_prefix
  '#PBS'
end
hold(id) click to toggle source

Put the submitted job on hold @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong holding a job @return [void] @see Adapter#hold

# File lib/ood_core/job/adapters/pbspro.rb, line 390
def hold(id)
  @pbspro.hold_job(id.to_s)
rescue Batch::Error => e
  # assume successful job hold if can't find job id
  raise JobAdapterError, e.message unless /Unknown Job Id/ =~ e.message || /Job has finished/ =~ e.message
end
info(id) click to toggle source

Retrieve job info from the resource manager @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong getting job info @return [Info] information describing submitted job @see Adapter#info

# File lib/ood_core/job/adapters/pbspro.rb, line 350
def info(id)
  id = id.to_s

  job_infos = @pbspro.get_jobs(id: id).map do |v|
    parse_job_info(v)
  end

  if job_infos.empty?
    Info.new(id: id, status: :completed)
  elsif job_infos.length == 1
    job_infos.first
  else
    process_job_array(id, job_infos)
  end
rescue Batch::Error => e
  # set completed status if can't find job id
  if /Unknown Job Id/ =~ e.message || /Job has finished/ =~ e.message
    Info.new(
      id: id,
      status: :completed
    )
  else
    raise JobAdapterError, e.message
  end
end
info_all(attrs: nil) click to toggle source

Retrieve info for all jobs from the resource manager @raise [JobAdapterError] if something goes wrong getting job info @return [Array<Info>] information describing submitted jobs @see Adapter#info_all

# File lib/ood_core/job/adapters/pbspro.rb, line 305
def info_all(attrs: nil)
  @pbspro.get_jobs.map do |v|
    parse_job_info(v)
  end
rescue Batch::Error => e
  raise JobAdapterError, e.message
end
info_where_owner(owner, attrs: nil) click to toggle source

Retrieve info for all jobs for a given owner or owners from the resource manager @param owner [#to_s, Array<#to_s>] the owner(s) of the jobs @raise [JobAdapterError] if something goes wrong getting job info @return [Array<Info>] information describing submitted jobs

Calls superclass method OodCore::Job::Adapter#info_where_owner
# File lib/ood_core/job/adapters/pbspro.rb, line 318
def info_where_owner(owner, attrs: nil)
  owner = Array.wrap(owner).map(&:to_s)

  usr_jobs = @pbspro.select_jobs(args: ["-u", owner.join(",")])
  all_jobs = @pbspro.select_jobs(args: ["-T"])

  # `qstat` all jobs if user has too many jobs, otherwise `qstat` each
  # individual job (default factor is 10%)
  if usr_jobs.size > (qstat_factor * all_jobs.size)
    super
  else
    begin
      user_job_infos = []
      usr_jobs.each do |id|
        job = info(id)
        user_job_infos << job

        job.tasks.each {|task| user_job_infos << job.build_child_info(task)}
      end

      user_job_infos
    rescue Batch::Error => e
      raise JobAdapterError, e.message
    end
  end
end
release(id) click to toggle source

Release the job that is on hold @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong releasing a job @return [void] @see Adapter#release

# File lib/ood_core/job/adapters/pbspro.rb, line 402
def release(id)
  @pbspro.release_job(id.to_s)
rescue Batch::Error => e
  # assume successful job release if can't find job id
  raise JobAdapterError, e.message unless /Unknown Job Id/ =~ e.message || /Job has finished/ =~ e.message
end
status(id) click to toggle source

Retrieve job status from resource manager @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong getting job status @return [Status] status of job @see Adapter#status

# File lib/ood_core/job/adapters/pbspro.rb, line 381
def status(id)
  info(id.to_s).status
end
submit(script, after: [], afterok: [], afternotok: [], afterany: []) click to toggle source

Submit a job with the attributes defined in the job template instance @param script [Script] script object that describes the script and

attributes for the submitted job

@param after [#to_s, Array<#to_s>] this job may be scheduled for

execution at any point after dependent jobs have started execution

@param afterok [#to_s, Array<#to_s>] this job may be scheduled for

execution only after dependent jobs have terminated with no errors

@param afternotok [#to_s, Array<#to_s>] this job may be scheduled for

execution only after dependent jobs have terminated with errors

@param afterany [#to_s, Array<#to_s>] this job may be scheduled for

execution after dependent jobs have terminated

@raise [JobAdapterError] if something goes wrong submitting a job @return [String] the job id returned after successfully submitting a

job

@see Adapter#submit

# File lib/ood_core/job/adapters/pbspro.rb, line 241
def submit(script, after: [], afterok: [], afternotok: [], afterany: [])
  after      = Array(after).map(&:to_s)
  afterok    = Array(afterok).map(&:to_s)
  afternotok = Array(afternotok).map(&:to_s)
  afterany   = Array(afterany).map(&:to_s)

  # Set qsub options
  args = []
  # ignore args, can't use these if submitting from STDIN
  args.concat ["-h"] if script.submit_as_hold
  args.concat ["-r", script.rerunnable ? "y" : "n"] unless script.rerunnable.nil?
  args.concat ["-M", script.email.join(",")] unless script.email.nil?
  if script.email_on_started && script.email_on_terminated
    args.concat ["-m", "be"]
  elsif script.email_on_started
    args.concat ["-m", "b"]
  elsif script.email_on_terminated
    args.concat ["-m", "e"]
  end
  args.concat ["-N", script.job_name] unless script.job_name.nil?
  args.concat ["-S", script.shell_path] unless script.shell_path.nil?
  # ignore input_path (not defined in PBS Pro)
  args.concat ["-o", script.output_path] unless script.output_path.nil?
  args.concat ["-e", script.error_path] unless script.error_path.nil?
  # Reservations are actually just queues in PBS Pro
  args.concat ["-q", script.reservation_id] if !script.reservation_id.nil? && script.queue_name.nil?
  args.concat ["-q", script.queue_name] unless script.queue_name.nil?
  args.concat ["-p", script.priority] unless script.priority.nil?
  args.concat ["-a", script.start_time.localtime.strftime("%C%y%m%d%H%M.%S")] unless script.start_time.nil?
  args.concat ["-A", script.accounting_id] unless script.accounting_id.nil?
  args.concat ["-l", "walltime=#{seconds_to_duration(script.wall_time)}"] unless script.wall_time.nil?

  # Set dependencies
  depend = []
  depend << "after:#{after.join(":")}"           unless after.empty?
  depend << "afterok:#{afterok.join(":")}"       unless afterok.empty?
  depend << "afternotok:#{afternotok.join(":")}" unless afternotok.empty?
  depend << "afterany:#{afterany.join(":")}"     unless afterany.empty?
  args.concat ["-W", "depend=#{depend.join(",")}"]   unless depend.empty?

  # Set environment variables
  envvars = script.job_environment.to_h
  args.concat ["-v", envvars.map{|k,v| "#{k}=#{v}"}.join(",")] unless envvars.empty?
  args.concat ["-V"] if script.copy_environment?

  # If error_path is not specified we join stdout & stderr (as this
  # mimics what the other resource managers do)
  args.concat ["-j", "oe"] if script.error_path.nil?

  args.concat ["-J", script.job_array_request] unless script.job_array_request.nil?

  # Set native options
  args.concat script.native if script.native

  # Submit job
  @pbspro.submit_string(script.content, args: args, chdir: script.workdir)
rescue Batch::Error => e
  raise JobAdapterError, e.message
end

Private Instance Methods

duration_in_seconds(time) click to toggle source

Convert duration to seconds

# File lib/ood_core/job/adapters/pbspro.rb, line 427
def duration_in_seconds(time)
  time.nil? ? nil : time.split(':').map { |v| v.to_i }.inject(0) { |total, v| total * 60 + v }
end
get_state(st) click to toggle source

Determine state from PBS Pro state code

# File lib/ood_core/job/adapters/pbspro.rb, line 450
def get_state(st)
  STATE_MAP.fetch(st, :undetermined)
end
parse_job_info(v) click to toggle source

Parse hash describing PBS Pro job status

# File lib/ood_core/job/adapters/pbspro.rb, line 455
def parse_job_info(v)
  /^(?<job_owner>[\w-]+)@(?<submit_host>.+)$/ =~ v[:Job_Owner]
  allocated_nodes = parse_nodes(v[:exec_host] || "")
  procs = allocated_nodes.inject(0) { |sum, x| sum + x[:procs] }
  if allocated_nodes.empty? # fill in with requested resources
    allocated_nodes = [ { name: nil } ] * v.fetch(:Resource_List, {})[:nodect].to_i
    procs = v.fetch(:Resource_List, {})[:ncpus].to_i
  end
  Info.new(
    id: v[:job_id],
    status: get_state(v[:job_state]),
    allocated_nodes: allocated_nodes,
    submit_host: submit_host,
    job_name: v[:Job_Name],
    job_owner: job_owner,
    accounting_id: v[:Account_Name],
    procs: procs,
    queue_name: v[:queue],
    wallclock_time: duration_in_seconds(v.fetch(:resources_used, {})[:walltime]),
    wallclock_limit: duration_in_seconds(v.fetch(:Resource_List, {})[:walltime]),
    cpu_time: duration_in_seconds(v.fetch(:resources_used, {})[:cput]),
    submission_time: v[:ctime] ? Time.parse(v[:ctime]) : nil,
    dispatch_time: v[:stime] ? Time.parse(v[:stime]) : nil,
    native: v
  )
end
parse_nodes(node_list) click to toggle source

Convert host list string to individual nodes “hosta/J1+hostb/J2*P+…” where J1 and J2 are an index of the job on the named host and P is the number of processors allocated from that host to this job. P does not appear if it is 1. Example: “i5n14/2*7” uses 7 procs on node “i5n14”

# File lib/ood_core/job/adapters/pbspro.rb, line 441
def parse_nodes(node_list)
  node_list.split('+').map do |n|
    name, procs_list = n.split('/')
    procs = (procs_list.split('*')[1] || 1).to_i
    {name: name, procs: procs}
  end
end
process_job_array(id, jobs) click to toggle source

Combine the array parent with the states of its children

# File lib/ood_core/job/adapters/pbspro.rb, line 483
def process_job_array(id, jobs)
  parent_job = jobs.select { |j| /\[\]/ =~ j.id }.first
  parent = (parent_job) ? parent_job.to_h : {:id => id, :status => :undetermined}

  # create task hashes from children
  parent[:tasks] = jobs.reject { |j| /\[\]/ =~ j.id }.map do |j|
    {
      :id => j.id,
      :status => j.status.to_sym,
      :wallclock_time => j.wallclock_time
    }
  end

  Info.new(**parent)
end
seconds_to_duration(time) click to toggle source

Convert seconds to duration

# File lib/ood_core/job/adapters/pbspro.rb, line 432
def seconds_to_duration(time)
  "%02d:%02d:%02d" % [time/3600, time/60%60, time%60]
end