class OodCore::Job::Adapters::Torque::Batch

Object used for simplified communication with a batch server

Attributes

bin[R]

The path to the Torque client installation binaries @example For Torque 5.0.0

my_conn.bin.to_s #=> "/usr/local/Torque/5.0.0/bin"

@return [Pathname] path to Torque binaries

bin_overrides[R]

Optional overrides for Torque client executables @example

{'qsub' => '/usr/local/bin/qsub'}

@return Hash<String, String>

host[R]

The host of the Torque batch server @example OSC's Oakley batch server

my_conn.host #=> "oak-batch.osc.edu"

@return [String] the batch server host

lib[R]

The path to the Torque client installation libraries @example For Torque 5.0.0

my_conn.lib.to_s #=> "/usr/local/Torque/5.0.0/lib"

@return [Pathname] path to Torque libraries

strict_host_checking[R]

Determines whether to use strict_host_checking for ssh @example

my_conn.strict_host_checking.to_s #=> "owens.osc.edu"

@return [Bool]

submit_host[R]

The login node where job is submitted via ssh @example OSC's owens login node

my_conn.submit_host #=> "owens.osc.edu"

@return [String] the login node

Public Class Methods

new(host:, submit_host: "", strict_host_checking: true, lib: "", bin: "", bin_overrides: {}, **_) click to toggle source

@param host [#to_s] the batch server host @param submit_host [#to_s] the login node @param strict_host_checking [bool] use strict host checking when ssh to submit_host @param lib [#to_s] path to FFI installation libraries @param bin [#to_s] path to FFI installation binaries

# File lib/ood_core/job/adapters/torque/batch.rb, line 51
def initialize(host:, submit_host: "", strict_host_checking: true, lib: "", bin: "", bin_overrides: {}, **_)
  @host                 = host.to_s
  @submit_host          = submit_host.to_s
  @strict_host_checking = strict_host_checking
  @lib                  = Pathname.new(lib.to_s)
  @bin                  = Pathname.new(bin.to_s)
  @bin_overrides        = bin_overrides
end

Public Instance Methods

==(other) click to toggle source

The comparison operator @param other [#to_h] batch server to compare against @return [Boolean] how batch servers compare

# File lib/ood_core/job/adapters/torque/batch.rb, line 69
def ==(other)
  to_h == other.to_h
end
connect() { |cid| ... } click to toggle source

Creates a connection to batch server and calls block in context of this connection @yieldparam cid [Fixnum] connection id from established batch server connection @yieldreturn the final value of the block

# File lib/ood_core/job/adapters/torque/batch.rb, line 91
def connect(&block)
  FFI.lib = lib.join('libtorque.so')
  cid = FFI.pbs_connect(host)
  FFI.raise_error(cid.abs) if cid < 0  # raise error if negative connection id
  begin
    value = yield cid
  ensure
    FFI.pbs_disconnect(cid)            # always close connection
  end
  FFI.check_for_error                  # check for errors at end
  value
end
delete_job(id) click to toggle source

Delete a specified job from batch server @example Delete job '10219837.oak-batch.osc.edu' from batch

my_conn.delete_job('10219837.oak-batch.osc.edu')

@param id [#to_s] the id of the job @return [void]

# File lib/ood_core/job/adapters/torque/batch.rb, line 320
def delete_job(id)
  connect do |cid|
    FFI.pbs_deljob cid, id.to_s, nil
  end
end
eql?(other) click to toggle source

Checks whether two batch server objects are completely identical to each other @param other [Batch] batch server to compare against @return [Boolean] whether same objects

# File lib/ood_core/job/adapters/torque/batch.rb, line 77
def eql?(other)
  self.class == other.class && self == other
end
get_job(id, **kwargs) click to toggle source

Get info for given batch server's job @example Status info for OSC Oakley's '10219837.oak-batch.osc.edu' job

my_conn.get_job('102719837.oak-batch.osc.edu')
#=>
#{
#  "10219837.oak-batch.osc.edu" => {
#    :Job_Owner => "bob@oakley02.osc.edu",
#    :Job_Name => "CFD_Solver",
#    ...
#  }
#}

@param (see get_jobs) @return [Hash] hash with details of job

# File lib/ood_core/job/adapters/torque/batch.rb, line 279
def get_job(id, **kwargs)
  get_jobs(id: id, **kwargs)
end
get_jobs(id: '', filters: []) click to toggle source

Get a list of hashes of the jobs on the batch server @example Status info for OSC Oakley jobs

my_conn.get_jobs
#=>
#{
#  "10219837.oak-batch.osc.edu" => {
#    :Job_Owner => "bob@oakley02.osc.edu",
#    :Job_Name => "CFD_Solver",
#    ...
#  },
#  "10219838.oak-batch.osc.edu" => {
#    :Job_Owner => "sally@oakley01.osc.edu",
#    :Job_Name => "FEA_Solver",
#    ...
#  },
#  ...
#}

@param id [#to_s] the id of requested information @param filters [Array<Symbol>] list of attribs to filter on @return [Hash] hash of details for jobs

# File lib/ood_core/job/adapters/torque/batch.rb, line 258
def get_jobs(id: '', filters: [])
  connect do |cid|
    filters = FFI::Attrl.from_list(filters)
    batch_status = FFI.pbs_statjob cid, id.to_s, filters, nil
    batch_status.to_h.tap { FFI.pbs_statfree batch_status }
  end
end
get_node(id, **kwargs) click to toggle source

Get info for given batch server's node @example Status info for OSC Oakley's 'n0001' node

my_conn.get_node('n0001')
#=>
#{
#  "n0001" => {
#    :np => "12",
#    ...
#  }
#}

@param (see get_nodes) @return [Hash] status info for the node

# File lib/ood_core/job/adapters/torque/batch.rb, line 205
def get_node(id, **kwargs)
  get_nodes(id: id, **kwargs)
end
get_nodes(id: '', filters: []) click to toggle source

Get a list of hashes of the nodes on the batch server @example Status info for OSC Oakley nodes

my_conn.get_nodes
#=>
#{
#  "n0001" => {
#    :np => "12",
#    ...
#  },
#  "n0002" => {
#    :np => "12",
#    ...
#  },
#  ...
#}

@param id [#to_s] the id of requested information @param filters [Array<Symbol>] list of attribs to filter on @return [Hash] hash of details for nodes

# File lib/ood_core/job/adapters/torque/batch.rb, line 185
def get_nodes(id: '', filters: [])
  connect do |cid|
    filters = FFI::Attrl.from_list(filters)
    batch_status = FFI.pbs_statnode cid, id.to_s, filters, nil
    batch_status.to_h.tap { FFI.pbs_statfree batch_status }
  end
end
get_queue(id, **kwargs) click to toggle source

Get info for given batch server's queue @example Status info for OSC Oakley's parallel queue

my_conn.get_queue("parallel")
#=>
#{
#  "parallel" => {
#    :queue_type => "Execution",
#    ...
#  }
#}

@param (see @get_queues) @return [Hash] status info for the queue

# File lib/ood_core/job/adapters/torque/batch.rb, line 162
def get_queue(id, **kwargs)
  get_queues(id: id, **kwargs)
end
get_queues(id: '', filters: []) click to toggle source

Get a list of hashes of the queues on the batch server @example Status info for OSC Oakley queues

my_conn.get_queues
#=>
#{
#  "parallel" => {
#    :queue_type => "Execution",
#    ...
#  },
#  "serial" => {
#    :queue_type => "Execution",
#    ...
#  },
#  ...
#}

@param id [#to_s] the id of requested information @param filters [Array<Symbol>] list of attribs to filter on @return [Hash] hash of details for the queues

# File lib/ood_core/job/adapters/torque/batch.rb, line 142
def get_queues(id: '', filters: [])
  connect do |cid|
    filters = FFI::Attrl.from_list(filters)
    batch_status = FFI.pbs_statque cid, id.to_s, filters, nil
    batch_status.to_h.tap { FFI.pbs_statfree batch_status }
  end
end
get_status(filters: []) click to toggle source

Get a hash with status info for this batch server @example Status info for OSC Oakley batch server

my_conn.get_status
#=>
#{
#  "oak-batch.osc.edu:15001" => {
#    :server_state => "Idle",
#    ...
#  }
#}

@param filters [Array<Symbol>] list of attribs to filter on @return [Hash] status info for batch server

# File lib/ood_core/job/adapters/torque/batch.rb, line 116
def get_status(filters: [])
  connect do |cid|
    filters = FFI::Attrl.from_list filters
    batch_status = FFI.pbs_statserver cid, filters, nil
    batch_status.to_h.tap { FFI.pbs_statfree batch_status }
  end
end
hash() click to toggle source

Generates a hash value for this object @return [Fixnum] hash value of object

# File lib/ood_core/job/adapters/torque/batch.rb, line 83
def hash
  [self.class, to_h].hash
end
hold_job(id, type: :u) click to toggle source

Put specified job on hold Possible hold types:

:u => Available to the owner of the job, the batch operator and the batch administrator
:o => Available to the batch operator and the batch administrator
:s => Available to the batch administrator

@example Put job '10219837.oak-batch.osc.edu' on hold

my_conn.hold_job('10219837.oak-batch.osc.edu')

@param id [#to_s] the id of the job @param type [#to_s] type of hold to be applied @return [void]

# File lib/ood_core/job/adapters/torque/batch.rb, line 293
def hold_job(id, type: :u)
  connect do |cid|
    FFI.pbs_holdjob cid, id.to_s, type.to_s, nil
  end
end
release_job(id, type: :u) click to toggle source

Release a specified job that is on hold Possible hold types:

:u => Available to the owner of the job, the batch operator and the batch administrator
:o => Available to the batch operator and the batch administrator
:s => Available to the batch administrator

@example Release job '10219837.oak-batch.osc.edu' from hold

my_conn.release_job('10219837.oak-batch.osc.edu')

@param id [#to_s] the id of the job @param type [#to_s] type of hold to be removed @return [void]

# File lib/ood_core/job/adapters/torque/batch.rb, line 309
def release_job(id, type: :u)
  connect do |cid|
    FFI.pbs_rlsjob cid, id.to_s, type.to_s, nil
  end
end
select_jobs(attribs: []) click to toggle source

Get a list of hashes of the selected jobs on the batch server @example Status info for jobs owned by Bob

my_conn.select_jobs(attribs: [{name: "User_List", value: "bob", op: :eq}])
#=>
#{
#  "10219837.oak-batch.osc.edu" => {
#    :Job_Owner => "bob@oakley02.osc.edu",
#    :Job_Name => "CFD_Solver",
#    ...
#  },
#  "10219839.oak-batch.osc.edu" => {
#    :Job_Owner => "bob@oakley02.osc.edu",
#    :Job_Name => "CFD_Solver2",
#    ...
#  },
#  ...
#}

@param attribs [Array<#to_h>] list of hashes describing attributes to

select on

@return [Hash] hash of details of selected jobs

# File lib/ood_core/job/adapters/torque/batch.rb, line 230
def select_jobs(attribs: [])
  connect do |cid|
    attribs = FFI::Attropl.from_list(attribs.map(&:to_h))
    batch_status = FFI.pbs_selstat cid, attribs, nil
    batch_status.to_h.tap { FFI.pbs_statfree batch_status }
  end
end
submit(content, args: [], env: {}, chdir: nil) click to toggle source

Submit a script expanded as a string to the batch server @param content [#to_s] script as a string @param args [Array<#to_s>] arguments passed to `qsub` command @param env [Hash{#to_s => to_s}] environment variables set @param chdir [#to_s, nil] working directory where `qsub` is called from @raise [Error] if `qsub` command exited unsuccessfully @return [String] the id of the job that was created

# File lib/ood_core/job/adapters/torque/batch.rb, line 374
def submit(content, args: [], env: {}, chdir: nil)
  call(:qsub, *args, env: env, stdin: content, chdir: chdir).strip
end
submit_script(script, queue: nil, headers: {}, resources: {}, envvars: {}, qsub: true) click to toggle source

Submit a script to the batch server @example Submit a script with a few PBS directives

my_conn.submit_script("/path/to/script",
  headers: {
    Job_Name: "myjob",
    Join_Path: "oe"
  },
  resources: {
    nodes: "4:ppn=12",
    walltime: "12:00:00"
  },
  envvars: {
    TOKEN: "asd90f9sd8g90hk34"
  }
)
#=> "6621251.oak-batch.osc.edu"

@param script [#to_s] path to the script @param queue [#to_s] queue to submit script to @param headers [Hash] pbs headers @param resources [Hash] pbs resources @param envvars [Hash] pbs environment variables @param qsub [Boolean] whether use library or binary for submission @return [String] the id of the job that was created @deprecated Use {#submit} instead.

# File lib/ood_core/job/adapters/torque/batch.rb, line 350
def submit_script(script, queue: nil, headers: {}, resources: {}, envvars: {}, qsub: true)
  send(qsub ? :qsub_submit : :pbs_submit, script.to_s, queue.to_s, headers, resources, envvars)
end
submit_string(string, **kwargs) click to toggle source

Submit a script expanded into a string to the batch server @param string [#to_s] script as a string @param (see submit_script) @return [String] the id of the job that was created @deprecated Use {#submit} instead.

# File lib/ood_core/job/adapters/torque/batch.rb, line 359
def submit_string(string, **kwargs)
  Tempfile.open('qsub.') do |f|
    f.write string.to_s
    f.close
    submit_script(f.path, **kwargs)
  end
end
to_h() click to toggle source

Convert object to hash @return [Hash] the hash describing this object

# File lib/ood_core/job/adapters/torque/batch.rb, line 62
def to_h
  {host: host, submit_host: submit_host, strict_host_checking: strict_host_checking, lib: lib, bin: bin}
end

Private Instance Methods

call(cmd, *args, env: {}, stdin: "", chdir: nil) click to toggle source

Call a forked PBS command for a given host

# File lib/ood_core/job/adapters/torque/batch.rb, line 474
def call(cmd, *args, env: {}, stdin: "", chdir: nil)
  cmd = OodCore::Job::Adapters::Helper.bin_path(cmd, bin, bin_overrides)
  env  = env.to_h.each_with_object({}) {|(k,v), h| h[k.to_s] = v.to_s}.merge({
    "PBS_DEFAULT"     => host,
    "LD_LIBRARY_PATH" => %{#{lib}:#{ENV["LD_LIBRARY_PATH"]}}
  })
  cmd, args = OodCore::Job::Adapters::Helper.ssh_wrap(submit_host, cmd, args, strict_host_checking, env)
  stdin = stdin.to_s
  chdir ||= "."
  o, e, s = Open3.capture3(env, cmd, *(args.map(&:to_s)), stdin_data: stdin, chdir: chdir.to_s)
  s.success? ? o : raise(Error, e)
end
pbs_submit(script, queue, headers, resources, envvars) click to toggle source

Submit a script using FFI library

# File lib/ood_core/job/adapters/torque/batch.rb, line 380
def pbs_submit(script, queue, headers, resources, envvars)
  attribs = []
  headers.each do |name, value|
    attribs << { name: name, value: value }
  end
  resources.each do |rsc, value|
    attribs << { name: :Resource_List, resource: rsc, value: value }
  end
  unless envvars.empty?
    attribs << {
      name: :Variable_List,
      value: envvars.map {|k,v| "#{k}=#{v}"}.join(",")
    }
  end

  connect do |cid|
    attropl = FFI::Attropl.from_list attribs
    FFI.pbs_submit cid, attropl, script, queue, nil
  end
end
qsub_arg(key, value) click to toggle source

Mapping of FFI attribute to `qsub` arguments

# File lib/ood_core/job/adapters/torque/batch.rb, line 402
def qsub_arg(key, value)
  case key
  # common attributes
  when :Execution_Time
    ['-a', value.to_s]
  when :Checkpoint
    ['-c', value.to_s]
  when :Error_Path
    ['-e', value.to_s]
  when :fault_tolerant
    ['-f']
  when :Hold_Types
    ['-h']
  when :Join_Path
    ['-j', value.to_s]
  when :Keep_Files
    ['-k', value.to_s]
  when :Mail_Points
    ['-m', value.to_s]
  when :Output_Path
    ['-o', value.to_s]
  when :Priority
    ['-p', value.to_s]
  when :Rerunable
    ['-r', value.to_s]
  when :job_array_request
    ['-t', value.to_s]
  when :User_List
    ['-u', value.to_s]
  when :Account_Name
    ['-A', value.to_s]
  when :Mail_Users
    ['-M', value.to_s]
  when :Job_Name
    ['-N', value.to_s]
  when :Shell_Path_List
    ['-S', value.to_s]
  # uncommon attributes
  when :job_arguments
    ['-F', value.to_s]
  when :init_work_dir
    ['-d', value.to_s] # sets PBS_O_INITDIR
  when :reservation_id
    ['-W', "x=advres:#{value}"] # use resource manager extensions for Moab
  # everything else
  else
    ['-W', "#{key}=#{value}"]
  end
end
qsub_submit(script, queue, headers, resources, envvars) click to toggle source

Submit a script using FFI binary NB: The binary includes many useful filters and is preferred

# File lib/ood_core/job/adapters/torque/batch.rb, line 454
def qsub_submit(script, queue, headers, resources, envvars)
  params  = []
  params.concat ["-q", "#{queue}"] unless queue.empty?
  params.concat headers.map {|k,v| qsub_arg(k,v)}.flatten
  params.concat resources.map{|k,v| ["-l", "#{k}=#{v}"]}.flatten
  params.concat ["-v", envvars.map{|k,v| "#{k}=#{v}"}.join(",")] unless envvars.empty?
  params << script

  env = {
    "PBS_DEFAULT"     => "#{host}",
    "LD_LIBRARY_PATH" => "#{lib}:#{ENV['LD_LIBRARY_PATH']}"
  }
  cmd = OodCore::Job::Adapters::Helper.bin_path('qsub', bin, bin_overrides)
  cmd, params = OodCore::Job::Adapters::Helper.ssh_wrap(submit_host, cmd, params, strict_host_checking, env)
  o, e, s = Open3.capture3(env, cmd, *params)
  raise Error, e unless s.success?
  o.chomp
end