module Dag::Client::API::Job
Public Instance Methods
query(query: '', output_format: 'csv', output_resource_path: '', cluster_name: '', label: '')
click to toggle source
# File lib/dag/client/api/job.rb, line 81 def query(query: '', output_format: 'csv', output_resource_path: '', cluster_name: '', label: '') raise Dag::Client::ParameterInvalid.new('query is blank') if query.blank? raise Dag::Client::ParameterInvalid.new('query should start with SELECT') if query !~ /\ASELECT/i raise Dag::Client::ParameterInvalid.new('query should not include OVERWRITE') if query =~ /OVERWRITE/i if output_format && !['csv', 'tsv'].include?(output_format) raise Dag::Client::ParameterInvalid.new('ouput_format should be csv or tsv') end raise Dag::Client::ParameterInvalid.new('output_resource_path is blank') if output_resource_path.blank? unless output_resource_path.start_with?('dag://') raise Dag::Client::ParameterInvalid.new("output_resource_path should start with 'dag://'") end unless output_resource_path.end_with?('/') raise Dag::Client::ParameterInvalid.new("output_resource_path should end with '/'") end raise Dag::Client::ParameterInvalid.new('cluster_name is blank') if cluster_name.blank? parameters = { 'outputFormat' => output_format || 'csv', 'outputResourcePath' => output_resource_path, 'query' => query, 'clusterName' => cluster_name, 'label' => label, } resource = "/v1/" execute(RestParameter.new(:post, resource, cano_resource: 'select', content_type: 'application/json', parameters: parameters)) end
query_cancel(job_id)
click to toggle source
# File lib/dag/client/api/job.rb, line 76 def query_cancel(job_id) resource = "/v1/#{job_id}" execute(RestParameter.new(:delete, resource, cano_resource: 'query', content_type: 'application/json')) end
query_info(job_id)
click to toggle source
# File lib/dag/client/api/job.rb, line 56 def query_info(job_id) resource = "/v1/#{job_id}" execute(RestParameter.new(:get, resource, cano_resource: 'query')) end
query_info_list(options = {})
click to toggle source
# File lib/dag/client/api/job.rb, line 6 def query_info_list(options = {}) resource = "/v1/" query_params = list_params(options) status = options[:status] if status statuses = status.split(',') if statuses.any? { |s| !['running', 'finished', 'canceled', 'error'].include?(s) } raise Dag::Client::ParameterInvalid.new("status is invalid: #{status}") end query_params = query_params.merge('status' => status) end type = options[:type] if type unless ['select', 'split'].include?(type.to_s) raise Dag::Client::ParameterInvalid.new("type is invalid: #{type}") end query_params = query_params.merge('type' => type) end cluster_name = options[:cluster_name] if cluster_name query_params = query_params.merge('clusterName' => cluster_name) end label_prefix = options[:label_prefix] if label_prefix query_params = query_params.merge('labelPrefix' => label_prefix) end cluster_rebooted = options[:cluster_rebooted] unless cluster_rebooted.nil? unless [TrueClass, FalseClass].any? { |c| cluster_rebooted.kind_of?(c) } raise Dag::Client::ParameterInvalid.new("cluster_rebooted is invalid: #{cluster_rebooted}") end query_params = query_params.merge('clusterRebooted' => cluster_rebooted) end order = options[:order] if order unless ['asc', 'desc'].include?(order) raise Dag::Client::ParameterInvalid.new("order is invalid: #{order}") end query_params = query_params.merge('order' => order) end execute(RestParameter.new(:get, resource, cano_resource: 'query', query_params: query_params)) end
query_log(job_id)
click to toggle source
# File lib/dag/client/api/job.rb, line 61 def query_log(job_id) resource = "/v1/#{job_id}/log" log_info = execute(RestParameter.new(:get, resource, cano_resource: 'query')) if log_info.present? io = StringIO.new('', 'r+') log_info['log'].each_line do |line| io.puts line unless line.include?('CLIService') end return { "log" => io.string } end log_info end