class OodCore::Job::Adapters::Sge::Batch

Object used for simplified communication with a SGE batch server

@api private

Constants

STATE_MAP

Adapted from www.softpanorama.org/HPC/Grid_engine/Queues/queue_states.shtml

Attributes

bin[R]
bin_overrides[R]
cluster[R]
conf[R]
helper[R]
strict_host_checking[R]
submit_host[R]

Public Class Methods

new(config) click to toggle source

@param opts [#to_h] the options defining this adapter @option opts [Batch] :batch The Sge batch object

@api private @see Factory.build_sge

# File lib/ood_core/job/adapters/sge/batch.rb, line 34
def initialize(config)
  @cluster          = config.fetch(:cluster, nil)
  @bin              = Pathname.new(config.fetch(:bin, nil).to_s)
  @sge_root         = Pathname.new(config[:sge_root] || ENV['SGE_ROOT'] || "/var/lib/gridengine")
  @bin_overrides    = config.fetch(:bin_overrides, {})
  @submit_host      = config.fetch(:submit_host, "")
  @strict_host_checking = config.fetch(:strict_host_checking, true)

  # FIXME: hack as this affects env of the process!
  ENV['SGE_ROOT'] = @sge_root.to_s

  if config[:libdrmaa_path]
    load_drmaa(config[:libdrmaa_path])
    @can_use_drmaa    = true
  else
    @can_use_drmaa    = false
  end

  @helper = OodCore::Job::Adapters::Sge::Helper.new
end

Public Instance Methods

call(cmd, *args, env: {}, stdin: "", chdir: nil) click to toggle source

Call a forked SGE command for a given batch server

# File lib/ood_core/job/adapters/sge/batch.rb, line 169
def call(cmd, *args, env: {}, stdin: "", chdir: nil)
  cmd = OodCore::Job::Adapters::Helper.bin_path(cmd, bin, bin_overrides)
  env = env.to_h.each_with_object({}) { |(k, v), h| h[k.to_s] = v.to_s }
  cmd, args = OodCore::Job::Adapters::Helper.ssh_wrap(submit_host, cmd, args, strict_host_checking, env)
  chdir ||= "."
  o, e, s = Open3.capture3(env, cmd, *(args.map(&:to_s)), stdin_data: stdin.to_s, chdir: chdir.to_s)
  s.success? ? o : raise(Error, e)
end
can_use_drmaa?() click to toggle source
# File lib/ood_core/job/adapters/sge/batch.rb, line 136
def can_use_drmaa?
  @can_use_drmaa
end
delete(job_id) click to toggle source

Call qdel @param job_id [#to_s] @return [void]

# File lib/ood_core/job/adapters/sge/batch.rb, line 157
def delete(job_id)
  call('qdel', job_id)
end
get_all(owner: nil) click to toggle source

Get OodCore::Job::Info for every enqueued job, optionally filtering on owner @param owner [#to_s] the owner or owner list @return [Array<OodCore::Job::Info>]

# File lib/ood_core/job/adapters/sge/batch.rb, line 64
def get_all(owner: nil)
  listener = QstatXmlRListener.new
  argv = ['qstat', '-r', '-xml']
  argv.concat ['-u', owner] unless owner.nil?
  REXML::Parsers::StreamParser.new(call(*argv), listener).parse

  listener.parsed_jobs.map{
    |job_hash| OodCore::Job::Info.new(
      **post_process_qstat_job_hash(job_hash)
    )
  }
end
get_info_enqueued_job(job_id) click to toggle source

Get OodCore::Job::Info for a job_id that may still be in the queue

If libdrmaa is not loaded then we cannot use DRMAA. Using DRMAA provides better job status and should always be chosen if it is possible.

When qstat is called in XML mode for a job id that is not in the queue invalid XML is returned. The second line of the invalid XML contains the string '<unknown_jobs' which will be used to recognize this case.

@param job_id [#to_s] @return [OodCore::Job::Info]

# File lib/ood_core/job/adapters/sge/batch.rb, line 88
def get_info_enqueued_job(job_id)
  job_info = OodCore::Job::Info.new(id: job_id.to_s, status: :completed)
  argv = ['qstat', '-r', '-xml', '-j', job_id.to_s]

  begin
    results = call(*argv)
    listener = QstatXmlJRListener.new
    REXML::Parsers::StreamParser.new(results, listener).parse

    job_hash = listener.parsed_job

    if job_hash[:id]
      update_job_hash_status!(job_hash)
    else
      job_hash[:id] = job_id
      job_hash[:status] = :completed
    end

    job_info = OodCore::Job::Info.new(**job_hash)
  rescue REXML::ParseException => e
    # If the error is something other than a job not being found by qstat re-raise the error
    unless results =~ /unknown_jobs/
      raise e, "REXML::ParseException error and command '#{argv.join(' ')}' produced results that didn't contain string 'unknown_jobs'. ParseException: #{e.message}"
    end
  rescue StandardError => e
    # Note that DRMAA is not guaranteed to be defined, hence the tests
    raise e unless ( can_use_drmaa? && e.is_a?(DRMAA::DRMAAInvalidJobError))  # raised when job is not found
  end

  job_info
end
get_status_from_drmaa?(job_hash) click to toggle source
# File lib/ood_core/job/adapters/sge/batch.rb, line 130
def get_status_from_drmaa?(job_hash)
  # DRMAA does not recognize the parent task in job arrays
  # e.g. 123 is invalid if it is an array job, while 123.4 is valid
  can_use_drmaa? && job_hash[:tasks].empty?
end
get_status_from_drmma(job_id) click to toggle source

Get the job status using DRMAA

# File lib/ood_core/job/adapters/sge/batch.rb, line 235
def get_status_from_drmma(job_id)
  translate_drmaa_state(DRMAA::SessionSingleton.instance.job_ps(job_id.to_s))
end
hold(job_id) click to toggle source

Call qhold @param job_id [#to_s] @return [void]

# File lib/ood_core/job/adapters/sge/batch.rb, line 143
def hold(job_id)
  call('qhold', job_id)
end
load_drmaa(libdrmaa_path) click to toggle source
# File lib/ood_core/job/adapters/sge/batch.rb, line 55
def load_drmaa(libdrmaa_path)
  FFI_DRMAA.libdrmaa_path = libdrmaa_path if libdrmaa_path
  require "ood_core/job/adapters/drmaa"
  require "ood_core/refinements/drmaa_extensions"
end
post_process_qstat_job_hash(job_hash) click to toggle source
# File lib/ood_core/job/adapters/sge/batch.rb, line 222
def post_process_qstat_job_hash(job_hash)
  # dispatch is not set if the job is not running
  if ! job_hash.key?(:wallclock_time)
    job_hash[:wallclock_time] = job_hash.key?(:dispatch_time) ? Time.now.to_i - job_hash[:dispatch_time] : 0
  end

  job_hash[:status] = translate_sge_state(job_hash[:status])
  update_job_hash_status!(job_hash)

  job_hash
end
release(job_id) click to toggle source

Call qrls @param job_id [#to_s] @return [void]

# File lib/ood_core/job/adapters/sge/batch.rb, line 150
def release(job_id)
  call('qrls', job_id)
end
submit(content, args) click to toggle source

Call qsub with arguments and the scripts content @param job_id [#to_s] @return job_id [String]

# File lib/ood_core/job/adapters/sge/batch.rb, line 164
def submit(content, args)
    @helper.parse_job_id_from_qsub(call('qsub', *args, :stdin => content))
end
translate_drmaa_state(drmaa_state_code) click to toggle source
# File lib/ood_core/job/adapters/sge/batch.rb, line 218
def translate_drmaa_state(drmaa_state_code)
  DRMAA::DRMMA_TO_OOD_STATE_MAP.fetch(drmaa_state_code, :undetermined)
end
translate_sge_state(sge_state_code) click to toggle source
# File lib/ood_core/job/adapters/sge/batch.rb, line 214
def translate_sge_state(sge_state_code)
  STATE_MAP.fetch(sge_state_code, :undetermined)
end
update_job_hash_status!(job_hash) click to toggle source
# File lib/ood_core/job/adapters/sge/batch.rb, line 120
def update_job_hash_status!(job_hash)
  if get_status_from_drmaa?(job_hash)
    begin
      job_hash[:status] = get_status_from_drmma(job_hash[:id])
    rescue DRMAA::DRMAAException => e
      # log DRMAA error?
    end
  end
end