class OodCore::Job::Adapters::Slurm::Batch
Object used for simplified communication with a Slurm
batch server @api private
Constants
- RECORD_SEPARATOR
- UNIT_SEPARATOR
Attributes
Optional overrides for Slurm
client executables @example
{'sbatch' => '/usr/local/bin/sbatch'}
@return Hash<String, String>
The cluster of the Slurm
batch server @example CHPC's kingspeak cluster
my_batch.cluster #=> "kingspeak"
@return [String, nil] the cluster name
Wheter to use strict host checking when ssh to submit_host
@example false @return [Bool]; true if empty
The login node where the job is submitted via ssh @example owens.osc.edu @return [String] The login node
Public Class Methods
@param cluster [#to_s, nil] the cluster name @param conf [#to_s, nil] path to the slurm conf @param bin [#to_s] path to slurm installation binaries @param bin_overrides
[#to_h] a hash of bin ovverides to be used in job @param submit_host
[#to_s] Submits the job on a login node via ssh @param strict_host_checking
[Bool] Whether to use strict host checking when ssh to submit_host
# File lib/ood_core/job/adapters/slurm.rb, line 92 def initialize(cluster: nil, bin: nil, conf: nil, bin_overrides: {}, submit_host: "", strict_host_checking: true) @cluster = cluster && cluster.to_s @conf = conf && Pathname.new(conf.to_s) @bin = Pathname.new(bin.to_s) @bin_overrides = bin_overrides @submit_host = submit_host.to_s @strict_host_checking = strict_host_checking end
Public Instance Methods
Fields requested from a formatted `squeue` call Note that the order of these fields is important
# File lib/ood_core/job/adapters/slurm.rb, line 226 def all_squeue_fields { account: "%a", job_id: "%A", exec_host: "%B", min_cpus: "%c", cpus: "%C", min_tmp_disk: "%d", nodes: "%D", end_time: "%e", dependency: "%E", features: "%f", array_job_id: "%F", group_name: "%g", group_id: "%G", over_subscribe: "%h", sockets_per_node: "%H", array_job_task_id: "%i", cores_per_socket: "%I", job_name: "%j", threads_per_core: "%J", comment: "%k", array_task_id: "%K", time_limit: "%l", time_left: "%L", min_memory: "%m", time_used: "%M", req_node: "%n", node_list: "%N", command: "%o", contiguous: "%O", qos: "%q", partition: "%P", priority: "%Q", reason: "%r", start_time: "%S", state_compact: "%t", state: "%T", user: "%u", user_id: "%U", reservation: "%v", submit_time: "%V", wckey: "%w", licenses: "%W", excluded_nodes: "%x", core_specialization: "%X", nice: "%y", scheduled_nodes: "%Y", sockets_cores_threads: "%z", work_dir: "%Z", gres: "%b", # must come at the end to fix a bug with Slurm 18 } end
Delete a specified job from batch server @example Delete job “1234”
my_batch.delete_job("1234")
@param id [#to_s] the id of the job @raise [Error] if `scancel` command exited unsuccessfully @return [void]
# File lib/ood_core/job/adapters/slurm.rb, line 208 def delete_job(id) call("scancel", id.to_s) end
Get a list of hashes detailing each of the jobs on the batch server @example Status
info for all jobs
my_batch.get_jobs #=> #[ # { # :account => "account", # :job_id => "my_job", # ... # }, # { # :account => "account", # :job_id => "my_other_job", # ... # }, # ... #]
@param id [#to_s] the id of the job @param owner [String] the owner(s) of the job @param attrs [Array<Symbol>, nil] list of attributes request when calling squeue @raise [Error] if `squeue` command exited unsuccessfully @return [Array<Hash>] list of details for jobs
# File lib/ood_core/job/adapters/slurm.rb, line 123 def get_jobs(id: "", owner: nil, attrs: nil) fields = squeue_fields(attrs) args = squeue_args(id: id, owner: owner, options: fields.values) #TODO: switch mock of Open3 to be the squeue mock script # then you can use that for performance metrics StringIO.open(call("squeue", *args)) do |output| advance_past_squeue_header!(output) jobs = [] output.each_line(RECORD_SEPARATOR) do |line| # TODO: once you can do performance metrics you can test zip against some other tools # or just small optimizations # for example, fields is ALREADY A HASH and we are setting the VALUES to # "line.strip.split(unit_separator)" array # # i.e. store keys in an array, do Hash[[keys, values].transpose] # # or # # job = {} # keys.each_with_index { |key, index| [key] = values[index] } # jobs << job # # assuming keys and values are same length! if not we have an error! values = line.chomp(RECORD_SEPARATOR).strip.split(UNIT_SEPARATOR) jobs << Hash[fields.keys.zip(values)] unless values.empty? end jobs end rescue SlurmTimeoutError # TODO: could use a log entry here return [{ id: id, state: 'undetermined' }] end
Put a specified job on hold @example Put job “1234” on hold
my_batch.hold_job("1234")
@param id [#to_s] the id of the job @raise [Error] if `scontrol` command exited unsuccessfully @return [void]
# File lib/ood_core/job/adapters/slurm.rb, line 188 def hold_job(id) call("scontrol", "hold", id.to_s) end
Release a specified job that is on hold @example Release job “1234” from on hold
my_batch.release_job("1234")
@param id [#to_s] the id of the job @raise [Error] if `scontrol` command exited unsuccessfully @return [void]
# File lib/ood_core/job/adapters/slurm.rb, line 198 def release_job(id) call("scontrol", "release", id.to_s) end
TODO: write some barebones test for this? like 2 options and id or no id
# File lib/ood_core/job/adapters/slurm.rb, line 174 def squeue_args(id: "", owner: nil, options: []) args = ["--all", "--states=all", "--noconvert"] args.concat ["-o", "#{RECORD_SEPARATOR}#{options.join(UNIT_SEPARATOR)}"] args.concat ["-u", owner.to_s] unless owner.to_s.empty? args.concat ["-j", id.to_s] unless id.to_s.empty? args end
# File lib/ood_core/job/adapters/slurm.rb, line 158 def squeue_fields(attrs) if attrs.nil? all_squeue_fields else all_squeue_fields.slice(*squeue_attrs_for_info_attrs(Array.wrap(attrs) + squeue_required_fields)) end end
# File lib/ood_core/job/adapters/slurm.rb, line 166 def squeue_required_fields #TODO: does this need to include ::array_job_task_id? #TODO: does it matter that order of the output can vary depending on the arguments and if "squeue_required_fields" are included? # previously the order was "fields.keys"; i don't think it does [:job_id, :state_compact] end
Submit a script expanded as a string to the batch server @param str [#to_s] script as a string @param args [Array<#to_s>] arguments passed to `sbatch` command @param env [Hash{#to_s => to_s}] environment variables set @raise [Error] if `sbatch` command exited unsuccessfully @return [String] the id of the job that was created
# File lib/ood_core/job/adapters/slurm.rb, line 218 def submit_string(str, args: [], env: {}) args = args.map(&:to_s) + ["--parsable"] env = env.to_h.each_with_object({}) { |(k, v), h| h[k.to_s] = v.to_s } call("sbatch", *args, env: env, stdin: str.to_s).strip.split(";").first end
Private Instance Methods
Modify the StringIO instance by advancing past the squeue header
The first two “records” should always be discarded. Consider the following squeue with -M output (invisible characters shown):
CLUSTER: slurm_cluster_name\n \x1EJOBID\x1F\x1FSTATE\n \x1E1\x1F\x1FR\n \x1E2\x1F\x1FPD\n
Splitting on the record separator first gives the Cluster
header, and then the regular header. If -M or –cluster is not specified the effect is the same because the record separator is at the start of the format string, so the first “record” would simply be empty.
# File lib/ood_core/job/adapters/slurm.rb, line 296 def advance_past_squeue_header!(squeue_output) 2.times { squeue_output.gets(RECORD_SEPARATOR) } end
Call a forked Slurm
command for a given cluster
# File lib/ood_core/job/adapters/slurm.rb, line 301 def call(cmd, *args, env: {}, stdin: "") cmd = OodCore::Job::Adapters::Helper.bin_path(cmd, bin, bin_overrides) args = args.map(&:to_s) args.concat ["-M", cluster] if cluster env = env.to_h env["SLURM_CONF"] = conf.to_s if conf cmd, args = OodCore::Job::Adapters::Helper.ssh_wrap(submit_host, cmd, args, strict_host_checking) o, e, s = Open3.capture3(env, cmd, *(args.map(&:to_s)), stdin_data: stdin.to_s) s.success? ? interpret_and_raise(o, e) : raise(Error, e) end
Helper
function to raise an error based on the contents of stderr. Slurm
exits 0 even when the command fails, so we need to interpret stderr to see if the command was actually successful.
# File lib/ood_core/job/adapters/slurm.rb, line 318 def interpret_and_raise(stdout, stderr) return stdout if stderr.empty? raise SlurmTimeoutError, stderr if /^slurm_load_jobs error: Socket timed out/.match(stderr) stdout end
# File lib/ood_core/job/adapters/slurm.rb, line 326 def squeue_attrs_for_info_attrs(attrs) attrs.map { |a| { id: :job_id, status: :state_compact, allocated_nodes: [:node_list, :scheduled_nodes], # submit_host: nil, job_name: :job_name, job_owner: :user, accounting_id: :account, procs: :cpus, queue_name: :partition, wallclock_time: :time_used, wallclock_limit: :time_limit, # cpu_time: nil, submission_time: :submit_time, dispatch_time: :start_time }.fetch(a, a) }.flatten end