module Dag::Client::API::Job

Public Instance Methods

query(query: '', output_format: 'csv', output_resource_path: '', cluster_name: '', label: '') click to toggle source
# File lib/dag/client/api/job.rb, line 81
def query(query: '', output_format: 'csv', output_resource_path: '', cluster_name: '', label: '')
  raise Dag::Client::ParameterInvalid.new('query is blank') if query.blank?

  raise Dag::Client::ParameterInvalid.new('query should start with SELECT') if query !~ /\ASELECT/i
  raise Dag::Client::ParameterInvalid.new('query should not include OVERWRITE') if query =~ /OVERWRITE/i

  if output_format && !['csv', 'tsv'].include?(output_format)
    raise Dag::Client::ParameterInvalid.new('ouput_format should be csv or tsv')
  end

  raise Dag::Client::ParameterInvalid.new('output_resource_path is blank') if output_resource_path.blank?

  unless output_resource_path.start_with?('dag://')
    raise Dag::Client::ParameterInvalid.new("output_resource_path should start with 'dag://'")
  end
  unless output_resource_path.end_with?('/')
    raise Dag::Client::ParameterInvalid.new("output_resource_path should end with '/'")
  end

  raise Dag::Client::ParameterInvalid.new('cluster_name is blank') if cluster_name.blank?


  parameters = {
    'outputFormat' => output_format || 'csv',
    'outputResourcePath' => output_resource_path,
    'query' => query,
    'clusterName' => cluster_name,
    'label' => label,
  }

  resource = "/v1/"
  execute(RestParameter.new(:post, resource, cano_resource: 'select', content_type: 'application/json', parameters: parameters))
end
query_cancel(job_id) click to toggle source
# File lib/dag/client/api/job.rb, line 76
def query_cancel(job_id)
  resource = "/v1/#{job_id}"
  execute(RestParameter.new(:delete, resource, cano_resource: 'query', content_type: 'application/json'))
end
query_info(job_id) click to toggle source
# File lib/dag/client/api/job.rb, line 56
def query_info(job_id)
  resource = "/v1/#{job_id}"
  execute(RestParameter.new(:get, resource, cano_resource: 'query'))
end
query_info_list(options = {}) click to toggle source
# File lib/dag/client/api/job.rb, line 6
def query_info_list(options = {})
  resource = "/v1/"
  query_params = list_params(options)

  status = options[:status]
  if status
    statuses = status.split(',')
    if statuses.any? { |s| !['running', 'finished', 'canceled', 'error'].include?(s) }
      raise Dag::Client::ParameterInvalid.new("status is invalid: #{status}")
    end
    query_params = query_params.merge('status' => status)
  end

  type = options[:type]
  if type
    unless ['select', 'split'].include?(type.to_s)
      raise Dag::Client::ParameterInvalid.new("type is invalid: #{type}")
    end
    query_params = query_params.merge('type' => type)
  end

  cluster_name = options[:cluster_name]
  if cluster_name
    query_params = query_params.merge('clusterName' => cluster_name)
  end

  label_prefix = options[:label_prefix]
  if label_prefix
    query_params = query_params.merge('labelPrefix' => label_prefix)
  end

  cluster_rebooted = options[:cluster_rebooted]
  unless cluster_rebooted.nil?
    unless [TrueClass, FalseClass].any? { |c| cluster_rebooted.kind_of?(c) }
      raise Dag::Client::ParameterInvalid.new("cluster_rebooted is invalid: #{cluster_rebooted}")
    end
    query_params = query_params.merge('clusterRebooted' => cluster_rebooted)
  end

  order = options[:order]
  if order
    unless ['asc', 'desc'].include?(order)
      raise Dag::Client::ParameterInvalid.new("order is invalid: #{order}")
    end
    query_params = query_params.merge('order' => order)
  end

  execute(RestParameter.new(:get, resource, cano_resource: 'query', query_params: query_params))
end
query_log(job_id) click to toggle source
# File lib/dag/client/api/job.rb, line 61
def query_log(job_id)
  resource = "/v1/#{job_id}/log"
  log_info = execute(RestParameter.new(:get, resource, cano_resource: 'query'))

  if log_info.present?
    io = StringIO.new('', 'r+')
    log_info['log'].each_line do |line|
      io.puts line unless line.include?('CLIService')
    end
    return { "log" => io.string }
  end

  log_info
end