class Aq::Query
Public Class Methods
new(bucket, object_prefix)
click to toggle source
Calls superclass method
Aq::Base::new
# File lib/aq/query.rb, line 11 def initialize(bucket, object_prefix) super() raise InvalidParameterError.new '`bucket` must be specified.' if bucket.nil? || bucket.empty? @bucket = bucket @object_prefix = object_prefix end
Public Instance Methods
run(query, timeout=nil)
click to toggle source
# File lib/aq/query.rb, line 18 def run(query, timeout=nil) log.info "Run Query: #{query}" exec_id = start_query query log.info "QueryExecutionID: #{exec_id}" timeout_time = Time.now + timeout unless timeout.nil? while timeout.nil? || timeout_time > Time.now log.debug 'Waiting query finished' if query_finished? exec_id print_result_and_exit exec_id end end log.error "Query:#{exec_id} is timeout. Stop query execution." stop_query exec_id exit 1 end
Private Instance Methods
print_result_and_exit(exec_id)
click to toggle source
# File lib/aq/query.rb, line 54 def print_result_and_exit(exec_id) res = @client.get_query_execution({query_execution_id: exec_id}) case res.query_execution.status.state when 'SUCCEEDED' location = res.query_execution.result_configuration.output_location log.info "Query succeeded. Result: #{location}" print_result_file location exit 0 when 'FAILED' log.error "Query failed. Reason: #{res.query_execution.status.state_change_reason}" exit 1 when 'CANCELLED' log.error "Query canceled" exit 1 else log.warn "Unknown state" end end
print_result_file(location)
click to toggle source
# File lib/aq/query.rb, line 73 def print_result_file(location) body = Aws::S3::Client.new.get_object( {bucket: @bucket, key: location.sub(/^s3:\/\/#{@bucket}\//, '')} ).body if location.end_with? '.csv' csv = CSV.new(body, headers: true) result = csv.readlines.map(&:fields) kosi = Kosi::Table.new({header: csv.headers}) print kosi.render(result) else print body.read + "\n" end end
query_finished?(exec_id)
click to toggle source
# File lib/aq/query.rb, line 49 def query_finished?(exec_id) res = @client.get_query_execution({query_execution_id: exec_id}) %w(SUCCEEDED FAILED CANCELLED).include? res.query_execution.status.state end
start_query(query)
click to toggle source
# File lib/aq/query.rb, line 38 def start_query(query) params = { query_string: query, result_configuration: { output_location: "s3://#{@bucket}/#{@object_prefix}" } } res = @client.start_query_execution(params) res.query_execution_id end
stop_query(exec_id)
click to toggle source
# File lib/aq/query.rb, line 87 def stop_query(exec_id) @client.stop_query_execution({query_execution_id: exec_id}) end