class Dag::Job

Attributes

access_key_id[R]
cluster[R]
cluster_rebooted[R]
dsl[R]
id[R]
input_format[R]
input_object_keys[R]
job_id[R]
label[R]
output_database[R]
output_format[R]
output_resource_path[R]
output_table[R]
process_engine[R]
progress[R]
query[R]
schema[R]
stage[R]
start_at[R]
status[R]
type[R]

Public Class Methods

new(api, job_info) click to toggle source
Calls superclass method Dag::Model::new
# File lib/dag/client/model/job.rb, line 3
def initialize(api, job_info)
  super(api)
  update_parameters(job_info)
end

Public Instance Methods

cancel()
Alias for: kill
cluster_rebooted?() click to toggle source
# File lib/dag/client/model/job.rb, line 28
def cluster_rebooted?
  !!@cluster_rebooted
end
download_urls(time_limit = 30) click to toggle source
# File lib/dag/client/model/job.rb, line 44
def download_urls(time_limit = 30)
  raise Dag::Client::StatusInvalid.new("job status is not finished") unless finished?
  expire_at = time_limit.minutes.since.to_i
  object_uri = URI.parse(@output_resource_path)
  bucket = object_uri.host
  object_path = object_uri.path[1..-1]
  object_path += '/' unless object_path.end_with? '/'
  bucket_objects = @api.objects(bucket, prefix: object_path).objects
  bucket_objects.map do |object|
    path = if @api.force_path_style?
             "/#{bucket}/#{object}"
           else
             "/#{object}"
           end

    parameters = {
      "Expires" => expire_at,
      "IIJGIOAccessKeyId" => @api.apikey,
      "Signature" => @api.download_signature(expire_at, bucket, path)
    }

    uri = URI.parse(@api.storage_api)
    url = if @api.force_path_style?
            "http://#{uri.host}"
          else
            "http://#{bucket}.#{uri.host}"
          end
    url += ":#{uri.port}" unless uri.port == 80 || uri.port == 443

    File.join(url, "#{path}?#{parameters.to_param}")
  end
end
finished?() click to toggle source
# File lib/dag/client/model/job.rb, line 12
def finished?
  @status == 'finished'
end
hive?() click to toggle source
# File lib/dag/client/model/job.rb, line 24
def hive?
  @type == 'select'
end
kill() click to toggle source
# File lib/dag/client/model/job.rb, line 37
def kill
  validate_cancel_condition
  @api.query_cancel(@id)
end
Also aliased as: cancel
log() click to toggle source
# File lib/dag/client/model/job.rb, line 77
def log
  validate_log_condition
  log_info = @api.query_log(@id)
  log_info ? log_info['log'] : ''
end
reload() click to toggle source
# File lib/dag/client/model/job.rb, line 32
def reload
  job_info = @api.query_info(@id)
  update_parameters(job_info)
end
running?() click to toggle source
# File lib/dag/client/model/job.rb, line 16
def running?
  @status == 'running'
end
split?() click to toggle source
# File lib/dag/client/model/job.rb, line 20
def split?
  @type == 'split'
end
validate_cancel_condition() click to toggle source
# File lib/dag/client/model/job.rb, line 93
def validate_cancel_condition
  unless running?
    raise Dag::Client::StatusInvalid.new("job status is not running")
  end
end
validate_log_condition() click to toggle source
# File lib/dag/client/model/job.rb, line 83
def validate_log_condition
  if split?
    raise Dag::Client::JobTypeInvalid.new("job type is not select")
  end

  if cluster_rebooted?
    raise Dag::Client::ClusterRebooted.new("cluster is rebooted")
  end
end

Private Instance Methods

update_parameters(job) click to toggle source
# File lib/dag/client/model/job.rb, line 101
def update_parameters(job)
  @id = job['id']
  @status = job['status']
  @process_engine = job['processEngine']
  @dsl = job['dsl']
  @cluster = job['clusterName']
  @cluster_rebooted = job['clusterRebooted']
  @start_at = Time.parse(job['startTime']) if job['startTime']
  @access_key_id = job['accessKeyId']
  @query = job['query']
  @output_format = job['outputFormat']
  @output_resource_path = job['outputResourcePath']
  @type = job['type']
  @label = job['label']
  @stage = job['stage']
  @progress = job['progress']
  @job_id = job['jobId']
  @schema = job['schema']
  @input_object_keys = job['inputObjectKeys']
  @input_format = job['inputFormat']
  @output_database = job['outputDatabase']
  @output_table = job['outputTable']
end