class PrestoLegacy::Client::StatementClient

Constants

HEADERS
HTTP11_CTL_CHARSET
HTTP11_CTL_CHARSET_REGEXP
HTTP11_SEPARATOR
HTTP11_TOKEN_CHARSET
HTTP11_TOKEN_REGEXP

Attributes

exception[R]
query[R]

Public Class Methods

new(faraday, query, options, next_uri=nil) click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 42
def initialize(faraday, query, options, next_uri=nil)
  @faraday = faraday
  @faraday.headers.merge!(HEADERS)

  @options = options
  @query = query
  @closed = false
  @exception = nil

  @faraday.headers.merge!(optional_headers)

  if next_uri
    body = faraday_get_with_retry(next_uri)
    @results = Models::QueryResults.decode(MultiJson.load(body))
  else
    post_query_request!
  end
end

Public Instance Methods

advance() click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 148
def advance
  if closed? || !has_next?
    return false
  end
  uri = @results.next_uri

  body = faraday_get_with_retry(uri)
  @results = load_json(uri, body, Models::QueryResults)

  return true
end
cancel_leaf_stage() click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 216
def cancel_leaf_stage
  if uri = @results.next_uri
    response = @faraday.delete do |req|
      req.url uri
    end
    return response.status / 100 == 2
  end
  return false
end
close() click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 247
def close
  return if @closed

  # cancel running statement
  # TODO make async reqeust and ignore response?
  cancel_leaf_stage

  @closed = true
  nil
end
closed?() click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 122
def closed?
  @closed
end
current_results() click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 140
def current_results
  @results
end
debug?() click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 118
def debug?
  !!@options[:debug]
end
encode_properties(properties) click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 232
def encode_properties(properties)
  # this is a hack to set same header multiple times.
  properties.map do |k, v|
    token = k.to_s
    field_value = v.to_s  # TODO LWS encoding is not implemented
    unless k =~ HTTP11_TOKEN_REGEXP
      raise Faraday::ClientError, "Key of properties can't include HTTP/1.1 control characters or separators (#{HTTP11_SEPARATOR.map {|c| c =~ /\s/ ? c.dump : c }.join(' ')})"
    end
    if field_value =~ HTTP11_CTL_CHARSET_REGEXP
      raise Faraday::ClientError, "Value of properties can't include HTTP/1.1 control characters"
    end
    "#{token}=#{field_value}"
  end.join("\r\n#{PrestoHeaders::PRESTO_SESSION}: ")
end
exception?() click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 128
def exception?
  @exception
end
faraday_get_with_retry(uri, &block) click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 181
def faraday_get_with_retry(uri, &block)
  start = Time.now
  attempts = 0

  begin
    begin
      response = @faraday.get(uri)
    rescue Faraday::Error::TimeoutError, Faraday::Error::ConnectionFailed
      # temporally error to retry
      response = nil
    rescue => e
      @exception = e
      raise @exception
    end

    if response
      if response.status == 200 && !response.body.to_s.empty?
        return response.body
      end

      if response.status != 503  # retry only if 503 Service Unavailable
        # deterministic error
        @exception = PrestoHttpError.new(response.status, "Presto API error at #{uri} returned #{response.status}: #{response.body}")
        raise @exception
      end
    end

    attempts += 1
    sleep attempts * 0.1
  end while (Time.now - start) < 2*60*60 && !@closed

  @exception = PrestoHttpError.new(408, "Presto API error due to timeout")
  raise @exception
end
has_next?() click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 144
def has_next?
  !!@results.next_uri
end
query_failed?() click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 132
def query_failed?
  @results.error != nil
end
query_info() click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 160
def query_info
  uri = "/v1/query/#{@results.id}"
  body = faraday_get_with_retry(uri)
  load_json(uri, body, Models::QueryInfo)
end
query_succeeded?() click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 136
def query_succeeded?
  @results.error == nil && !@exception && !@closed
end

Private Instance Methods

init_request(req) click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 89
def init_request(req)
  req.options.timeout = @options[:http_timeout] || 300
  req.options.open_timeout = @options[:http_open_timeout] || 60
end
load_json(uri, body, body_class) click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 166
def load_json(uri, body, body_class)
  hash = MultiJson.load(body)
  begin
    body_class.decode(hash)
  rescue => e
    if body.size > 1024 + 3
      body = "#{body[0, 1024]}..."
    end
    @exception = PrestoHttpError.new(500, "Presto API returned unexpected structure at #{uri}. Expected #{body_class} but got #{body}: #{e}")
    raise @exception
  end
end
optional_headers() click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 61
def optional_headers
  headers = {}
  if v = @options[:user]
    headers[PrestoHeaders::PRESTO_USER] = v
  end
  if v = @options[:source]
    headers[PrestoHeaders::PRESTO_SOURCE] = v
  end
  if v = @options[:catalog]
    headers[PrestoHeaders::PRESTO_CATALOG] = v
  end
  if v = @options[:schema]
    headers[PrestoHeaders::PRESTO_SCHEMA] = v
  end
  if v = @options[:time_zone]
    headers[PrestoHeaders::PRESTO_TIME_ZONE] = v
  end
  if v = @options[:language]
    headers[PrestoHeaders::PRESTO_LANGUAGE] = v
  end
  if v = @options[:properties]
    headers[PrestoHeaders::PRESTO_SESSION] = encode_properties(v)
  end
  headers
end
post_query_request!() click to toggle source
# File lib/presto_legacy/client/statement_client.rb, line 96
def post_query_request!
  uri = "/v1/statement"
  response = @faraday.post do |req|
    req.url uri

    req.body = @query
    init_request(req)
  end

  # TODO error handling
  if response.status != 200
    raise PrestoHttpError.new(response.status, "Failed to start query: #{response.body}")
  end

  body = response.body
  @results = load_json(uri, body, Models::QueryResults)
end