class Athens::Query
Attributes
query_execution_id[R]
Public Class Methods
new(connection, query_execution_id)
click to toggle source
# File lib/athens/query.rb, line 5 def initialize(connection, query_execution_id) @connection = connection @query_execution_id = query_execution_id @state = nil @state_reason = nil @cancelled = false @results = nil @hash_results = nil version = RUBY_VERSION.split('.').map {|v| v.to_i} @decimal_without_new = (version[0] >= 2 && version[1] >= 5) @decimal_without_new = (version[0] == 2 && version[1] >= 5) || (version[0] >= 3) end
Public Instance Methods
cancel()
click to toggle source
# File lib/athens/query.rb, line 57 def cancel unless @cancelled resp = @connection.client.stop_query_execution({ query_execution_id: @query_execution_id }) @cancelled = true refresh_state end if @state == 'CANCELLED' return true else return false end end
records()
click to toggle source
# File lib/athens/query.rb, line 103 def records Enumerator.new do |y| headers = nil rows.each_with_index do |row| if headers.nil? headers = row next end y << Hash[headers.zip(row)] end end end
rows()
click to toggle source
# File lib/athens/query.rb, line 73 def rows raise InvalidRequestError.new("Query must be in SUCCEEDED state to return results") unless @state == 'SUCCEEDED' Enumerator.new do |y| result = @connection.client.get_query_results({query_execution_id: @query_execution_id}) metadata = result.result_set.result_set_metadata first = true while true rows = result.result_set.rows break if rows.empty? if first y << rows.shift.data.map {|col| col.var_char_value} first = false end rows.each {|row| y << map_types(metadata, row)} break unless result.next_token result = @connection.client.get_query_results({ query_execution_id: @query_execution_id, next_token: result.next_token }) end end end
state()
click to toggle source
# File lib/athens/query.rb, line 20 def state refresh_state if state_needs_refresh? @state end
state_reason()
click to toggle source
# File lib/athens/query.rb, line 25 def state_reason refresh_state if state_needs_refresh? @state_reason end
to_a(header_row: true)
click to toggle source
# File lib/athens/query.rb, line 118 def to_a(header_row: true) (@results ||= rows.to_a).drop(header_row ? 0 : 1) end
to_h()
click to toggle source
# File lib/athens/query.rb, line 122 def to_h @hash_results ||= records.to_a end
wait(max_seconds = nil)
click to toggle source
# File lib/athens/query.rb, line 30 def wait(max_seconds = nil) if max_seconds.nil? stop_at = nil else stop_at = Time.now + max_seconds end while true if stop_at != nil && Time.now > stop_at return false end refresh_state if @state == 'SUCCEEDED' return true elsif @state == 'FAILED' raise QueryFailedError.new(@state_reason) elsif state == 'CANCELLED' raise QueryCancelledError.new(@state_reason) end # Wait a bit and check again sleep(Athens.configuration.wait_polling_period.to_f) end end
Private Instance Methods
map_types(metadata, row)
click to toggle source
# File lib/athens/query.rb, line 138 def map_types(metadata, row) mapped = [] metadata.column_info.each_with_index do |col, index| data = row.data[index].var_char_value nullable = ["UNKNOWN", "NULLABLE"].include?(col.nullable) if nullable && data.nil? mapped << data elsif !nullable && data.nil? raise InvalidNullError.new("Got null data from a non-null field (#{col.name})") else case col.type when 'tinyint', 'smallint', 'int', 'integer', 'bigint' mapped << data.to_i when 'timestamp' mapped << Time.parse(data) when 'varchar', 'string' mapped << data when 'float', 'double' mapped << data.to_f when 'decimal' if @decimal_without_new mapped << BigDecimal(data) else mapped << BigDecimal.new(data) end when 'date' mapped << Date.parse(data) when 'boolean' mapped << (data == "true") else puts "WARNING: Unsupported type: #{col.type}, defaulting to string" mapped << data end end end return mapped end
refresh_state()
click to toggle source
# File lib/athens/query.rb, line 131 def refresh_state resp = @connection.client.get_query_execution({query_execution_id: @query_execution_id}) @state = resp.query_execution.status.state @state_reason = resp.query_execution.status.state_change_reason end
state_needs_refresh?()
click to toggle source
# File lib/athens/query.rb, line 127 def state_needs_refresh? @state.nil? || (['QUEUED', 'RUNNING'].include?(@state)) end