class RBHive::TCLIConnection

Attributes

client[R]

Public Class Methods

new(server, port = 10_000, options = {}, logger = StdOutLogger.new) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 84
def initialize(server, port = 10_000, options = {}, logger = StdOutLogger.new)
  options ||= {} # backwards compatibility
  raise "'options' parameter must be a hash" unless options.is_a?(Hash)
  
  if options[:transport] == :sasl and options[:sasl_params].nil?
    raise ":transport is set to :sasl, but no :sasl_params option was supplied"
  end
  
  # Defaults to buffered transport, Hive 0.10, 1800 second timeout
  options[:transport]     ||= :buffered
  options[:hive_version]  ||= 10
  options[:timeout]       ||= 1800
  @options = options
  
  # Look up the appropriate Thrift protocol version for the supplied Hive version
  @thrift_protocol_version = thrift_hive_protocol(options[:hive_version])
  
  @logger = logger
  @transport = thrift_transport(server, port)
  @protocol = Thrift::BinaryProtocol.new(@transport)
  @client = Hive2::Thrift::TCLIService::Client.new(@protocol)
  @session = nil
  @logger.info("Connecting to HiveServer2 #{server} on port #{port}")
end

Public Instance Methods

add_columns(schema) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 367
def add_columns(schema)
  execute(schema.add_columns_statement)
end
async_cancel(handles) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 232
def async_cancel(handles)
  @client.CancelOperation(prepare_cancel_request(handles))
end
async_close_session(handles) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 294
def async_close_session(handles)
  validate_handles!(handles)
  @client.CloseSession(Hive2::Thrift::TCloseSessionReq.new( sessionHandle: handles[:session] ))
end
async_execute(query) click to toggle source

Async execute

# File lib/rbhive/t_c_l_i_connection.rb, line 193
def async_execute(query)
  @logger.info("Executing query asynchronously: #{query}")
  exec_result = @client.ExecuteStatement(
    Hive2::Thrift::TExecuteStatementReq.new(
      sessionHandle: @session.sessionHandle,
      statement: query,
      runAsync: true
    )
  )
  raise_error_if_failed!(exec_result)
  op_handle = exec_result.operationHandle

  # Return handles to get hold of this query / session again
  {
    session: @session.sessionHandle, 
    guid: op_handle.operationId.guid, 
    secret: op_handle.operationId.secret
  }
end
async_fetch(handles, max_rows = 100) click to toggle source

Async fetch results from an async execute

# File lib/rbhive/t_c_l_i_connection.rb, line 267
def async_fetch(handles, max_rows = 100)
  # Can't get data from an unfinished query
  unless async_is_complete?(handles)
    raise "Can't perform fetch on a query in state: #{async_state(handles)}"
  end
  
  # Fetch and
  fetch_rows(prepare_operation_handle(handles), :first, max_rows)
end
async_fetch_in_batch(handles, batch_size = 1000) { |rows| ... } click to toggle source

Performs a query on the server, fetches the results in batches of batch_size rows and yields the result batches to a given block as arrays of rows.

# File lib/rbhive/t_c_l_i_connection.rb, line 279
def async_fetch_in_batch(handles, batch_size = 1000, &block)
  raise "No block given for the batch fetch request!" unless block_given?
  # Can't get data from an unfinished query
  unless async_is_complete?(handles)
    raise "Can't perform fetch on a query in state: #{async_state(handles)}"
  end

  # Now let's iterate over the results
  loop do
    rows = fetch_rows(prepare_operation_handle(handles), :next, batch_size)
    break if rows.empty?
    yield rows
  end
end
async_is_cancelled?(handles) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 228
def async_is_cancelled?(handles)
  async_state(handles) == :cancelled
end
async_is_complete?(handles) click to toggle source

Is the query complete?

# File lib/rbhive/t_c_l_i_connection.rb, line 214
def async_is_complete?(handles)
  async_state(handles) == :finished
end
async_is_failed?(handles) click to toggle source

Has the query failed?

# File lib/rbhive/t_c_l_i_connection.rb, line 224
def async_is_failed?(handles)
  async_state(handles) == :error
end
async_is_running?(handles) click to toggle source

Is the query actually running?

# File lib/rbhive/t_c_l_i_connection.rb, line 219
def async_is_running?(handles)
  async_state(handles) == :running
end
async_state(handles) click to toggle source

Map states to symbols

# File lib/rbhive/t_c_l_i_connection.rb, line 237
def async_state(handles)
  response = @client.GetOperationStatus(
    Hive2::Thrift::TGetOperationStatusReq.new(operationHandle: prepare_operation_handle(handles))
  )

  case response.operationState
  when Hive2::Thrift::TOperationState::FINISHED_STATE
    return :finished
  when Hive2::Thrift::TOperationState::INITIALIZED_STATE
    return :initialized
  when Hive2::Thrift::TOperationState::RUNNING_STATE
    return :running
  when Hive2::Thrift::TOperationState::CANCELED_STATE
    return :cancelled
  when Hive2::Thrift::TOperationState::CLOSED_STATE
    return :closed
  when Hive2::Thrift::TOperationState::ERROR_STATE
    return :error
  when Hive2::Thrift::TOperationState::UKNOWN_STATE
    return :unknown
  when Hive2::Thrift::TOperationState::PENDING_STATE
    return :pending
  when nil
    raise "No operation state found for handles - has the session been closed?"
  else
    return :state_not_in_protocol
  end
end
close() click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 150
def close
  @transport.close
end
close_session() click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 158
def close_session
  @client.CloseSession prepare_close_session
  @session = nil
end
create_table(schema) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 354
def create_table(schema)
  execute(schema.create_table_statement)
end
drop_table(name) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 358
def drop_table(name)
  name = name.name if name.is_a?(TableSchema)
  execute("DROP TABLE `#{name}`")
end
execute(query) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 171
def execute(query)
  @logger.info("Executing Hive Query: #{query}")
  req = prepare_execute_statement(query)
  exec_result = client.ExecuteStatement(req)
  raise_error_if_failed!(exec_result)
  exec_result
end
explain(query) click to toggle source

Performs a explain on the supplied query on the server, returns it as a ExplainResult. (Only works on 0.12 if you have this patch - issues.apache.org/jira/browse/HIVE-5492)

# File lib/rbhive/t_c_l_i_connection.rb, line 310
def explain(query)
  rows = []
  fetch_in_batch("EXPLAIN " + query) do |batch|
    rows << batch.map { |b| b[:Explain] }
  end
  ExplainResult.new(rows.flatten)
end
fetch(query, max_rows = 100) click to toggle source

Performs a query on the server, fetches up to max_rows rows and returns them as an array.

# File lib/rbhive/t_c_l_i_connection.rb, line 319
def fetch(query, max_rows = 100)
  # Execute the query and check the result
  exec_result = execute(query)
  raise_error_if_failed!(exec_result)

  # Get search operation handle to fetch the results
  op_handle = exec_result.operationHandle
  
  # Fetch the rows
  fetch_rows(op_handle, :first, max_rows)
end
fetch_in_batch(query, batch_size = 1000) { |rows| ... } click to toggle source

Performs a query on the server, fetches the results in batches of batch_size rows and yields the result batches to a given block as arrays of rows.

# File lib/rbhive/t_c_l_i_connection.rb, line 333
def fetch_in_batch(query, batch_size = 1000, &block)
  raise "No block given for the batch fetch request!" unless block_given?
  
  # Execute the query and check the result
  exec_result = execute(query)
  raise_error_if_failed!(exec_result)

  # Get search operation handle to fetch the results
  op_handle = exec_result.operationHandle

  # Prepare fetch results request
  fetch_req = prepare_fetch_results(op_handle, :next, batch_size)

  # Now let's iterate over the results
  loop do
    rows = fetch_rows(op_handle, :next, batch_size)
    break if rows.empty?
    yield rows
  end
end
fetch_rows(op_handle, orientation = :first, max_rows = 1000) click to toggle source

Pull rows from the query result

# File lib/rbhive/t_c_l_i_connection.rb, line 300
def fetch_rows(op_handle, orientation = :first, max_rows = 1000)
  fetch_req = prepare_fetch_results(op_handle, orientation, max_rows)
  fetch_results = @client.FetchResults(fetch_req)
  raise_error_if_failed!(fetch_results)
  rows = fetch_results.results.rows
  TCLIResultSet.new(rows, TCLISchemaDefinition.new(get_schema_for(op_handle), rows.first))
end
method_missing(meth, *args) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 371
def method_missing(meth, *args)
  client.send(meth, *args)
end
open() click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 146
def open
  @transport.open
end
open_session() click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 154
def open_session
  @session = @client.OpenSession(prepare_open_session(@thrift_protocol_version))
end
parse_sasl_params(sasl_params) click to toggle source

Processes SASL connection params and returns a hash with symbol keys or a nil

# File lib/rbhive/t_c_l_i_connection.rb, line 135
def parse_sasl_params(sasl_params)
  # Symbilize keys in a hash
  if sasl_params.kind_of?(Hash)
    return sasl_params.inject({}) do |memo,(k,v)|
      memo[k.to_sym] = v;
      memo
    end
  end
  return nil
end
priority=(priority) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 179
def priority=(priority)
  set("mapred.job.priority", priority)
end
queue=(queue) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 183
def queue=(queue)
  set("mapred.job.queue.name", queue)
end
replace_columns(schema) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 363
def replace_columns(schema)
  execute(schema.replace_columns_statement)
end
session() click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 163
def session
  @session && @session.sessionHandle
end
set(name,value) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 187
def set(name,value)
  @logger.info("Setting #{name}=#{value}")
  self.execute("SET #{name}=#{value}")
end
thrift_hive_protocol(version) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 109
def thrift_hive_protocol(version)
  HIVE_THRIFT_MAPPING[version] || raise("Invalid Hive version")
end
thrift_socket(server, port, timeout) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 128
def thrift_socket(server, port, timeout)
  socket = Thrift::Socket.new(server, port)
  socket.timeout = timeout
  socket
end
thrift_transport(server, port) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 113
def thrift_transport(server, port)
  @logger.info("Initializing transport #{@options[:transport]}")
  case @options[:transport]
  when :buffered
    return Thrift::BufferedTransport.new(thrift_socket(server, port, @options[:timeout]))
  when :sasl
    return Thrift::SaslClientTransport.new(thrift_socket(server, port, @options[:timeout]),
                                           parse_sasl_params(@options[:sasl_params]))
  when :http
    return Thrift::HTTPClientTransport.new("http://#{server}:#{port}/cliservice")
  else
    raise "Unrecognised transport type '#{transport}'"
  end
end

Private Instance Methods

get_schema_for(handle) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 426
def get_schema_for(handle)
  req = ::Hive2::Thrift::TGetResultSetMetadataReq.new( operationHandle: handle )
  metadata = client.GetResultSetMetadata( req )
  metadata.schema
end
prepare_cancel_request(handles) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 414
def prepare_cancel_request(handles)
  Hive2::Thrift::TCancelOperationReq.new(
    operationHandle: prepare_operation_handle(handles)
  )
end
prepare_close_session() click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 383
def prepare_close_session
  ::Hive2::Thrift::TCloseSessionReq.new( sessionHandle: self.session )
end
prepare_execute_statement(query) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 387
def prepare_execute_statement(query)
  ::Hive2::Thrift::TExecuteStatementReq.new( sessionHandle: self.session, statement: query.to_s, confOverlay: {} )
end
prepare_fetch_results(handle, orientation=:first, rows=100) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 391
def prepare_fetch_results(handle, orientation=:first, rows=100)
  orientation_value = "FETCH_#{orientation.to_s.upcase}"
  valid_orientations = ::Hive2::Thrift::TFetchOrientation::VALUE_MAP.values
  unless valid_orientations.include?(orientation_value)
    raise ArgumentError, "Invalid orientation: #{orientation.inspect}"
  end
  orientation_const = eval("::Hive2::Thrift::TFetchOrientation::#{orientation_value}")
  ::Hive2::Thrift::TFetchResultsReq.new(
    operationHandle: handle,
    orientation: orientation_const,
    maxRows: rows
  )
end
prepare_open_session(client_protocol) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 377
def prepare_open_session(client_protocol)
  req = ::Hive2::Thrift::TOpenSessionReq.new( @options[:sasl_params].nil? ? [] : @options[:sasl_params] )
  req.client_protocol = client_protocol
  req
end
prepare_operation_handle(handles) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 405
def prepare_operation_handle(handles)
  validate_handles!(handles)
  Hive2::Thrift::TOperationHandle.new(
    operationId: Hive2::Thrift::THandleIdentifier.new(guid: handles[:guid], secret: handles[:secret]),
    operationType: Hive2::Thrift::TOperationType::EXECUTE_STATEMENT,
    hasResultSet: false
  )
end
raise_error_if_failed!(result) click to toggle source

Raises an exception if given operation result is a failure

# File lib/rbhive/t_c_l_i_connection.rb, line 433
def raise_error_if_failed!(result)
  return if result.status.statusCode == 0
  error_message = result.status.errorMessage || 'Execution failed!'
  raise RBHive::TCLIConnectionError.new(error_message)
end
validate_handles!(handles) click to toggle source
# File lib/rbhive/t_c_l_i_connection.rb, line 420
def validate_handles!(handles)
  unless handles.has_key?(:guid) and handles.has_key?(:secret) and handles.has_key?(:session)
    raise "Invalid handles hash: #{handles.inspect}"
  end
end