class OodCore::Job::Adapters::Lsf

Constants

STATE_MAP

Attributes

batch[R]

@api private

helper[R]

@api private

Public Class Methods

new(batch:) click to toggle source

@param opts [#to_h] the options defining this adapter @option opts [Batch] :batch The Lsf batch object

@api private @see Factory.build_lsf

# File lib/ood_core/job/adapters/lsf.rb, line 54
def initialize(batch:)
  @batch = batch
  @helper = Lsf::Helper.new
end

Public Instance Methods

delete(id) click to toggle source

Delete the submitted job @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong deleting a job @return [void] @see Adapter#delete

# File lib/ood_core/job/adapters/lsf.rb, line 165
def delete(id)
  batch.delete_job(id.to_s)
rescue Batch::Error => e
  raise JobAdapterError, e.message
end
directive_prefix() click to toggle source
# File lib/ood_core/job/adapters/lsf.rb, line 171
def directive_prefix
  '#BSUB'
end
hold(id) click to toggle source

Put the submitted job on hold @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong holding a job @return [void] @see Adapter#hold

# File lib/ood_core/job/adapters/lsf.rb, line 143
def hold(id)
  batch.hold_job(id.to_s)
rescue Batch::Error => e
  raise JobAdapterError, e.message
end
info(id) click to toggle source

Retrieve job info from the resource manager @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong getting job info @return [Info] information describing submitted job @see Adapter#info

# File lib/ood_core/job/adapters/lsf.rb, line 93
def info(id)
  info_ary = batch.get_job(id: id).map{|v| info_for_batch_hash(v)}
  handle_job_array(info_ary, id)
rescue Batch::Error => e
  raise JobAdapterError, e.message
end
info_all(attrs: nil) click to toggle source

Retrieve info for all jobs from the resource manager @raise [JobAdapterError] if something goes wrong getting job info @return [Array<Info>] information describing submitted jobs @see Adapter#info_all

# File lib/ood_core/job/adapters/lsf.rb, line 104
def info_all(attrs: nil)
  batch.get_jobs.map { |v| info_for_batch_hash(v) }
rescue Batch::Error => e
  raise JobAdapterError, e.message
end
info_where_owner(owner, attrs: nil) click to toggle source

Retrieve info for all of the owner's jobs from the resource manager @raise [JobAdapterError] if something goes wrong getting job info @return [Array<Info>] information describing submitted jobs @see Adapter#info_where_owner

Calls superclass method OodCore::Job::Adapter#info_where_owner
# File lib/ood_core/job/adapters/lsf.rb, line 114
def info_where_owner(owner, attrs: nil)
  owners = Array.wrap(owner).map(&:to_s)
  if owners.count > 1
    super
  elsif owners.count == 0
    []
  else
    batch.get_jobs_for_user(owners.first).map { |v| info_for_batch_hash(v) }
  end
rescue Batch::Error => e
  raise JobAdapterError, e.message
end
release(id) click to toggle source

Release the job that is on hold @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong releasing a job @return [void] @see Adapter#release

# File lib/ood_core/job/adapters/lsf.rb, line 154
def release(id)
  batch.release_job(id.to_s)
rescue Batch::Error => e
  raise JobAdapterError, e.message
end
status(id) click to toggle source

Retrieve job status from resource manager @param id [#to_s] the id of the job @raise [JobAdapterError] if something goes wrong getting job status @return [Status] status of job @see Adapter#status

# File lib/ood_core/job/adapters/lsf.rb, line 132
def status(id)
  info(id).status
rescue Batch::Error => e
  raise JobAdapterError, e.message
end
submit(script, after: [], afterok: [], afternotok: [], afterany: []) click to toggle source

Submit a job with the attributes defined in the job template instance @param script [Script] script object that describes the script and

attributes for the submitted job

@param after [#to_s, Array<#to_s>] this job may be scheduled for

execution at any point after dependent jobs have started execution

@param afterok [#to_s, Array<#to_s>] this job may be scheduled for

execution only after dependent jobs have terminated with no errors

@param afternotok [#to_s, Array<#to_s>] this job may be scheduled for

execution only after dependent jobs have terminated with errors

@param afterany [#to_s, Array<#to_s>] this job may be scheduled for

execution after dependent jobs have terminated

@raise [JobAdapterError] if something goes wrong submitting a job @return [String] the job id returned after successfully submitting a

job

@see Adapter#submit

# File lib/ood_core/job/adapters/lsf.rb, line 74
def submit(script, after: [], afterok: [], afternotok: [], afterany: [])
  # ensure dependencies are array of ids
  after      = Array(after).map(&:to_s)
  afterok    = Array(afterok).map(&:to_s)
  afternotok = Array(afternotok).map(&:to_s)
  afterany   = Array(afterany).map(&:to_s)

  kwargs = helper.batch_submit_args(script, after: after, afterok: afterok, afternotok: afternotok, afterany: afterany)

  batch.submit_string(script.content, **kwargs)
rescue Batch::Error => e
  raise JobAdapterError, e.message
end

Private Instance Methods

build_proxy_parent(info, id) click to toggle source

Proxy the first element as the parent hash delete non-shared attributes

# File lib/ood_core/job/adapters/lsf.rb, line 228
def build_proxy_parent(info, id)
  info.to_h.merge({
    :tasks => [],
    :id => id
  }).delete_if{
    |key, _| [
      :allocated_nodes, :dispatch_time,
      :cpu_time, :wallclock_time, :status
    ].include?(key)
  }.tap{
    # Remove the child array index from the :job_name

    # Note that a true representation of the parent should have the
    # full array spec in the name. Worth attempting to reconstruct?
    |h| h[:job_name] = h[:job_name].gsub(/\[[^\]]+\]/, '')
  }
end
get_state(st) click to toggle source

Determine state from LSF state code

# File lib/ood_core/job/adapters/lsf.rb, line 177
def get_state(st)
  STATE_MAP.fetch(st, :undetermined)
end
handle_job_array(info_ary, id) click to toggle source
# File lib/ood_core/job/adapters/lsf.rb, line 212
def handle_job_array(info_ary, id)
  return Info.new(id: id, status: :completed) if info_ary.nil? || info_ary.empty?
  return info_ary.first if info_ary.size == 1

  parent_task_hash = build_proxy_parent(info_ary.first, id)

  info_ary.map do |task_info|
    parent_task_hash[:tasks] << {:id => task_info.id, :status => task_info.status}
  end

  parent_task_hash[:status] = parent_task_hash[:tasks].map{|task| task[:status]}.max

  Info.new(**parent_task_hash)
end
info_for_batch_hash(v) click to toggle source
# File lib/ood_core/job/adapters/lsf.rb, line 181
def info_for_batch_hash(v)
  nodes = helper.parse_exec_host(v[:exec_host]).map do |host|
    NodeInfo.new(name: host[:host], procs: host[:slots])
  end

  # FIXME: estimated_runtime should be set by batch object instead of
  dispatch_time = helper.parse_past_time(v[:start_time], ignore_errors: true)
  finish_time = helper.parse_past_time(v[:finish_time], ignore_errors: true)

  # Detect job array index from name
  array_index = /(\[\d+\])$/.match(v[:name])

  Info.new(
    id: (array_index) ? "#{v[:id]}#{array_index[1]}" : v[:id],
    status: get_state(v[:status]),
    allocated_nodes: nodes,
    submit_host: v[:from_host],
    job_name: v[:name],
    job_owner: v[:user],
    accounting_id: v[:project],
    procs: nodes.any? ? nodes.map(&:procs).reduce(&:+) : 0,
    queue_name: v[:queue],
    wallclock_time: helper.estimate_runtime(current_time: Time.now, start_time: dispatch_time, finish_time: finish_time),
    cpu_time: helper.parse_cpu_used(v[:cpu_used]),
    # cpu_time: nil,
    submission_time: helper.parse_past_time(v[:submit_time], ignore_errors: true),
    dispatch_time: dispatch_time,
    native: v
  )
end