class OodCore::Job::Adapters::Lsf
Constants
- STATE_MAP
Attributes
@api private
@api private
Public Class Methods
@param opts [#to_h] the options defining this adapter @option opts [Batch] :batch The Lsf
batch object
@api private @see Factory.build_lsf
# File lib/ood_core/job/adapters/lsf.rb, line 54 def initialize(batch:) @batch = batch @helper = Lsf::Helper.new end
Public Instance Methods
Delete the submitted job @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong deleting a job @return [void] @see Adapter#delete
# File lib/ood_core/job/adapters/lsf.rb, line 165 def delete(id) batch.delete_job(id.to_s) rescue Batch::Error => e raise JobAdapterError, e.message end
# File lib/ood_core/job/adapters/lsf.rb, line 171 def directive_prefix '#BSUB' end
Put the submitted job on hold @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong holding a job @return [void] @see Adapter#hold
# File lib/ood_core/job/adapters/lsf.rb, line 143 def hold(id) batch.hold_job(id.to_s) rescue Batch::Error => e raise JobAdapterError, e.message end
Retrieve job info from the resource manager @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong getting job info @return [Info] information describing submitted job @see Adapter#info
# File lib/ood_core/job/adapters/lsf.rb, line 93 def info(id) info_ary = batch.get_job(id: id).map{|v| info_for_batch_hash(v)} handle_job_array(info_ary, id) rescue Batch::Error => e raise JobAdapterError, e.message end
Retrieve info for all jobs from the resource manager @raise [JobAdapterError] if something goes wrong getting job info @return [Array<Info>] information describing submitted jobs @see Adapter#info_all
# File lib/ood_core/job/adapters/lsf.rb, line 104 def info_all(attrs: nil) batch.get_jobs.map { |v| info_for_batch_hash(v) } rescue Batch::Error => e raise JobAdapterError, e.message end
Retrieve info for all of the owner's jobs from the resource manager @raise [JobAdapterError] if something goes wrong getting job info @return [Array<Info>] information describing submitted jobs @see Adapter#info_where_owner
OodCore::Job::Adapter#info_where_owner
# File lib/ood_core/job/adapters/lsf.rb, line 114 def info_where_owner(owner, attrs: nil) owners = Array.wrap(owner).map(&:to_s) if owners.count > 1 super elsif owners.count == 0 [] else batch.get_jobs_for_user(owners.first).map { |v| info_for_batch_hash(v) } end rescue Batch::Error => e raise JobAdapterError, e.message end
Release the job that is on hold @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong releasing a job @return [void] @see Adapter#release
# File lib/ood_core/job/adapters/lsf.rb, line 154 def release(id) batch.release_job(id.to_s) rescue Batch::Error => e raise JobAdapterError, e.message end
Retrieve job status from resource manager @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong getting job status @return [Status] status of job @see Adapter#status
# File lib/ood_core/job/adapters/lsf.rb, line 132 def status(id) info(id).status rescue Batch::Error => e raise JobAdapterError, e.message end
Submit a job with the attributes defined in the job template instance @param script [Script] script object that describes the script and
attributes for the submitted job
@param after [#to_s, Array<#to_s>] this job may be scheduled for
execution at any point after dependent jobs have started execution
@param afterok [#to_s, Array<#to_s>] this job may be scheduled for
execution only after dependent jobs have terminated with no errors
@param afternotok [#to_s, Array<#to_s>] this job may be scheduled for
execution only after dependent jobs have terminated with errors
@param afterany [#to_s, Array<#to_s>] this job may be scheduled for
execution after dependent jobs have terminated
@raise [JobAdapterError] if something goes wrong submitting a job @return [String] the job id returned after successfully submitting a
job
@see Adapter#submit
# File lib/ood_core/job/adapters/lsf.rb, line 74 def submit(script, after: [], afterok: [], afternotok: [], afterany: []) # ensure dependencies are array of ids after = Array(after).map(&:to_s) afterok = Array(afterok).map(&:to_s) afternotok = Array(afternotok).map(&:to_s) afterany = Array(afterany).map(&:to_s) kwargs = helper.batch_submit_args(script, after: after, afterok: afterok, afternotok: afternotok, afterany: afterany) batch.submit_string(script.content, **kwargs) rescue Batch::Error => e raise JobAdapterError, e.message end
Private Instance Methods
Proxy the first element as the parent hash delete non-shared attributes
# File lib/ood_core/job/adapters/lsf.rb, line 228 def build_proxy_parent(info, id) info.to_h.merge({ :tasks => [], :id => id }).delete_if{ |key, _| [ :allocated_nodes, :dispatch_time, :cpu_time, :wallclock_time, :status ].include?(key) }.tap{ # Remove the child array index from the :job_name # Note that a true representation of the parent should have the # full array spec in the name. Worth attempting to reconstruct? |h| h[:job_name] = h[:job_name].gsub(/\[[^\]]+\]/, '') } end
Determine state from LSF state code
# File lib/ood_core/job/adapters/lsf.rb, line 177 def get_state(st) STATE_MAP.fetch(st, :undetermined) end
# File lib/ood_core/job/adapters/lsf.rb, line 212 def handle_job_array(info_ary, id) return Info.new(id: id, status: :completed) if info_ary.nil? || info_ary.empty? return info_ary.first if info_ary.size == 1 parent_task_hash = build_proxy_parent(info_ary.first, id) info_ary.map do |task_info| parent_task_hash[:tasks] << {:id => task_info.id, :status => task_info.status} end parent_task_hash[:status] = parent_task_hash[:tasks].map{|task| task[:status]}.max Info.new(**parent_task_hash) end
# File lib/ood_core/job/adapters/lsf.rb, line 181 def info_for_batch_hash(v) nodes = helper.parse_exec_host(v[:exec_host]).map do |host| NodeInfo.new(name: host[:host], procs: host[:slots]) end # FIXME: estimated_runtime should be set by batch object instead of dispatch_time = helper.parse_past_time(v[:start_time], ignore_errors: true) finish_time = helper.parse_past_time(v[:finish_time], ignore_errors: true) # Detect job array index from name array_index = /(\[\d+\])$/.match(v[:name]) Info.new( id: (array_index) ? "#{v[:id]}#{array_index[1]}" : v[:id], status: get_state(v[:status]), allocated_nodes: nodes, submit_host: v[:from_host], job_name: v[:name], job_owner: v[:user], accounting_id: v[:project], procs: nodes.any? ? nodes.map(&:procs).reduce(&:+) : 0, queue_name: v[:queue], wallclock_time: helper.estimate_runtime(current_time: Time.now, start_time: dispatch_time, finish_time: finish_time), cpu_time: helper.parse_cpu_used(v[:cpu_used]), # cpu_time: nil, submission_time: helper.parse_past_time(v[:submit_time], ignore_errors: true), dispatch_time: dispatch_time, native: v ) end