class Hexspace::Client
Attributes
client[R]
session[R]
transport[R]
Public Class Methods
finalize(transport, client, session)
click to toggle source
private
# File lib/hexspace/client.rb, line 60 def self.finalize(transport, client, session) proc do req = TCloseSessionReq.new req.sessionHandle = session.sessionHandle client.CloseSession(req) transport.close end end
new(host: "localhost", port: nil, username: nil, password: nil, database: nil, mode: :sasl, timeout: nil)
click to toggle source
# File lib/hexspace/client.rb, line 5 def initialize(host: "localhost", port: nil, username: nil, password: nil, database: nil, mode: :sasl, timeout: nil) # could use current user in the future (like beeline) username ||= "anonymous" password ||= "anonymous" # TODO support kerberos # TODO support ssl for sockets @transport = case mode when :sasl socket = Thrift::Socket.new(host, port || 10000, timeout) # TODO support authzid SaslTransport.new(socket, username: username, password: password) when :nosasl socket = Thrift::Socket.new(host, port || 10000, timeout) Thrift::BufferedTransport.new(socket) when :http, :https raise ArgumentError, "timeout not supported with #{mode}" if timeout uri_class = mode == :http ? URI::HTTP : URI::HTTPS uri = uri_class.build(host: host, port: port || 10001, path: "/cliservice") t = Thrift::HTTPClientTransport.new(uri) t.add_headers({"Authorization" => "Basic #{Base64.strict_encode64("#{username}:#{password}")}"}) t else raise ArgumentError, "Invalid mode: #{mode}" end @transport.open protocol = Thrift::BinaryProtocol.new(@transport) @client = TCLIService::Client.new(protocol) req = TOpenSessionReq.new configuration = { # remove table prefix with Hive "set:hiveconf:hive.resultset.use.unique.column.names" => "false" } configuration["use:database"] = database if database req.configuration = configuration @session = @client.OpenSession(req) check_status @session ObjectSpace.define_finalizer(self, self.class.finalize(@transport, @client, @session)) end
Public Instance Methods
execute(statement, timeout: nil)
click to toggle source
TODO add new method that returns Result object so its possible to get duplicate columns as well as columns when there are no rows
# File lib/hexspace/client.rb, line 54 def execute(statement, timeout: nil) result = execute_statement(statement, timeout: timeout) process_result(result) if result.operationHandle.hasResultSet end
Private Instance Methods
check_status(obj)
click to toggle source
# File lib/hexspace/client.rb, line 71 def check_status(obj) if obj.status.statusCode != TStatusCode::SUCCESS_STATUS raise Error, obj.status.errorMessage end end
execute_statement(statement, async: false, timeout: nil)
click to toggle source
# File lib/hexspace/client.rb, line 77 def execute_statement(statement, async: false, timeout: nil) req = TExecuteStatementReq.new req.sessionHandle = session.sessionHandle req.statement = statement req.runAsync = async req.queryTimeout = timeout if timeout result = client.ExecuteStatement(req) check_status result result end
process_result(stmt)
click to toggle source
# File lib/hexspace/client.rb, line 88 def process_result(stmt) req = TGetResultSetMetadataReq.new req.operationHandle = stmt.operationHandle metadata = client.GetResultSetMetadata(req) check_status metadata rows = [] columns = metadata.schema.columns.map(&:columnName) types = metadata.schema.columns.map { |c| TYPE_NAMES[c.typeDesc.types.first.primitiveEntry.type].downcase } loop do req = TFetchResultsReq.new req.operationHandle = stmt.operationHandle req.maxRows = 10_000 result = client.FetchResults(req) check_status result new_rows = 0 start_offset = result.results.startRowOffset result.results.columns.each_with_index do |col, j| name = columns[j] value = col.get_value if j == 0 new_rows = value.values.size new_rows.times do rows << {} end end offset = start_offset nulls = value.nulls.unpack1("b*") values = value.values case types[j] # TODO decide how to handle time zones # when "timestamp" # values.each do |v| # rows[offset][name] = nulls[offset] == "1" ? nil : Time.parse(v) # offset += 1 # end when "date" values.each do |v| rows[offset][name] = nulls[offset] == "1" ? nil : Date.parse(v) offset += 1 end when "decimal" values.each do |v| rows[offset][name] = nulls[offset] == "1" ? nil : BigDecimal(v) offset += 1 end else values.each do |v| rows[offset][name] = nulls[offset] == "1" ? nil : v offset += 1 end end end break if new_rows < req.maxRows && !result.hasMoreRows end req = TCloseOperationReq.new req.operationHandle = stmt.operationHandle check_status client.CloseOperation(req) rows end