class PrestoLegacy::Client::StatementClient
Constants
- HEADERS
- HTTP11_CTL_CHARSET
- HTTP11_CTL_CHARSET_REGEXP
- HTTP11_SEPARATOR
- HTTP11_TOKEN_CHARSET
- HTTP11_TOKEN_REGEXP
Attributes
exception[R]
query[R]
Public Class Methods
new(faraday, query, options, next_uri=nil)
click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 42 def initialize(faraday, query, options, next_uri=nil) @faraday = faraday @faraday.headers.merge!(HEADERS) @options = options @query = query @closed = false @exception = nil @faraday.headers.merge!(optional_headers) if next_uri body = faraday_get_with_retry(next_uri) @results = Models::QueryResults.decode(MultiJson.load(body)) else post_query_request! end end
Public Instance Methods
advance()
click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 148 def advance if closed? || !has_next? return false end uri = @results.next_uri body = faraday_get_with_retry(uri) @results = load_json(uri, body, Models::QueryResults) return true end
cancel_leaf_stage()
click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 216 def cancel_leaf_stage if uri = @results.next_uri response = @faraday.delete do |req| req.url uri end return response.status / 100 == 2 end return false end
close()
click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 247 def close return if @closed # cancel running statement # TODO make async reqeust and ignore response? cancel_leaf_stage @closed = true nil end
closed?()
click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 122 def closed? @closed end
current_results()
click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 140 def current_results @results end
debug?()
click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 118 def debug? !!@options[:debug] end
encode_properties(properties)
click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 232 def encode_properties(properties) # this is a hack to set same header multiple times. properties.map do |k, v| token = k.to_s field_value = v.to_s # TODO LWS encoding is not implemented unless k =~ HTTP11_TOKEN_REGEXP raise Faraday::ClientError, "Key of properties can't include HTTP/1.1 control characters or separators (#{HTTP11_SEPARATOR.map {|c| c =~ /\s/ ? c.dump : c }.join(' ')})" end if field_value =~ HTTP11_CTL_CHARSET_REGEXP raise Faraday::ClientError, "Value of properties can't include HTTP/1.1 control characters" end "#{token}=#{field_value}" end.join("\r\n#{PrestoHeaders::PRESTO_SESSION}: ") end
exception?()
click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 128 def exception? @exception end
faraday_get_with_retry(uri, &block)
click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 181 def faraday_get_with_retry(uri, &block) start = Time.now attempts = 0 begin begin response = @faraday.get(uri) rescue Faraday::Error::TimeoutError, Faraday::Error::ConnectionFailed # temporally error to retry response = nil rescue => e @exception = e raise @exception end if response if response.status == 200 && !response.body.to_s.empty? return response.body end if response.status != 503 # retry only if 503 Service Unavailable # deterministic error @exception = PrestoHttpError.new(response.status, "Presto API error at #{uri} returned #{response.status}: #{response.body}") raise @exception end end attempts += 1 sleep attempts * 0.1 end while (Time.now - start) < 2*60*60 && !@closed @exception = PrestoHttpError.new(408, "Presto API error due to timeout") raise @exception end
has_next?()
click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 144 def has_next? !!@results.next_uri end
query_failed?()
click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 132 def query_failed? @results.error != nil end
query_info()
click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 160 def query_info uri = "/v1/query/#{@results.id}" body = faraday_get_with_retry(uri) load_json(uri, body, Models::QueryInfo) end
query_succeeded?()
click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 136 def query_succeeded? @results.error == nil && !@exception && !@closed end
Private Instance Methods
init_request(req)
click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 89 def init_request(req) req.options.timeout = @options[:http_timeout] || 300 req.options.open_timeout = @options[:http_open_timeout] || 60 end
load_json(uri, body, body_class)
click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 166 def load_json(uri, body, body_class) hash = MultiJson.load(body) begin body_class.decode(hash) rescue => e if body.size > 1024 + 3 body = "#{body[0, 1024]}..." end @exception = PrestoHttpError.new(500, "Presto API returned unexpected structure at #{uri}. Expected #{body_class} but got #{body}: #{e}") raise @exception end end
optional_headers()
click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 61 def optional_headers headers = {} if v = @options[:user] headers[PrestoHeaders::PRESTO_USER] = v end if v = @options[:source] headers[PrestoHeaders::PRESTO_SOURCE] = v end if v = @options[:catalog] headers[PrestoHeaders::PRESTO_CATALOG] = v end if v = @options[:schema] headers[PrestoHeaders::PRESTO_SCHEMA] = v end if v = @options[:time_zone] headers[PrestoHeaders::PRESTO_TIME_ZONE] = v end if v = @options[:language] headers[PrestoHeaders::PRESTO_LANGUAGE] = v end if v = @options[:properties] headers[PrestoHeaders::PRESTO_SESSION] = encode_properties(v) end headers end
post_query_request!()
click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 96 def post_query_request! uri = "/v1/statement" response = @faraday.post do |req| req.url uri req.body = @query init_request(req) end # TODO error handling if response.status != 200 raise PrestoHttpError.new(response.status, "Failed to start query: #{response.body}") end body = response.body @results = load_json(uri, body, Models::QueryResults) end