class RBHive::Connection

Attributes

client[R]

Public Class Methods

new(server, port=10_000, logger=StdOutLogger.new) click to toggle source
# File lib/rbhive/connection.rb, line 34
def initialize(server, port=10_000, logger=StdOutLogger.new)
  @socket = Thrift::Socket.new(server, port)
  @transport = Thrift::BufferedTransport.new(@socket)
  @protocol = Thrift::BinaryProtocol.new(@transport)
  @client = Hive::Thrift::ThriftHive::Client.new(@protocol)
  @logger = logger
  @logger.info("Connecting to #{server} on port #{port}")
  @mutex = Mutex.new
end

Public Instance Methods

add_columns(schema) click to toggle source
# File lib/rbhive/connection.rb, line 125
def add_columns(schema)
  execute(schema.add_columns_statement)
end
close() click to toggle source
# File lib/rbhive/connection.rb, line 48
def close
  @transport.close
end
create_table(schema) click to toggle source
# File lib/rbhive/connection.rb, line 112
def create_table(schema)
  execute(schema.create_table_statement)
end
drop_table(name) click to toggle source
# File lib/rbhive/connection.rb, line 116
def drop_table(name)
  name = name.name if name.is_a?(TableSchema)
  execute("DROP TABLE `#{name}`")
end
execute(query) click to toggle source
# File lib/rbhive/connection.rb, line 56
def execute(query)
  execute_safe(query)
end
explain(query) click to toggle source
# File lib/rbhive/connection.rb, line 60
def explain(query)
  safe do
    execute_unsafe("EXPLAIN "+ query)
    ExplainResult.new(client.fetchAll)
  end
end
fetch(query) click to toggle source
# File lib/rbhive/connection.rb, line 80
def fetch(query)
  safe do
    execute_unsafe(query)
    rows = client.fetchAll
    the_schema = SchemaDefinition.new(client.getSchema, rows.first)
    ResultSet.new(rows, the_schema)
  end
end
fetch_in_batch(query, batch_size=1_000) { |result_set| ... } click to toggle source
# File lib/rbhive/connection.rb, line 89
def fetch_in_batch(query, batch_size=1_000)
  safe do
    execute_unsafe(query)
    until (next_batch = client.fetchN(batch_size)).empty?
      the_schema ||= SchemaDefinition.new(client.getSchema, next_batch.first)
      yield ResultSet.new(next_batch, the_schema)
    end
  end
end
first(query) click to toggle source
# File lib/rbhive/connection.rb, line 99
def first(query)
  safe do
    execute_unsafe(query)
    row = client.fetchOne
    the_schema = SchemaDefinition.new(client.getSchema, row)
    ResultSet.new([row], the_schema).first
  end
end
method_missing(meth, *args) click to toggle source
# File lib/rbhive/connection.rb, line 129
def method_missing(meth, *args)
  client.send(meth, *args)
end
open() click to toggle source
# File lib/rbhive/connection.rb, line 44
def open
  @transport.open
end
priority=(priority) click to toggle source
# File lib/rbhive/connection.rb, line 67
def priority=(priority)
  set("mapred.job.priority", priority)
end
queue=(queue) click to toggle source
# File lib/rbhive/connection.rb, line 71
def queue=(queue)
  set("mapred.job.queue.name", queue)
end
replace_columns(schema) click to toggle source
# File lib/rbhive/connection.rb, line 121
def replace_columns(schema)
  execute(schema.replace_columns_statement)
end
schema(example_row=[]) click to toggle source
# File lib/rbhive/connection.rb, line 108
def schema(example_row=[])
  safe { SchemaDefinition.new(client.getSchema, example_row) }
end
set(name,value) click to toggle source
# File lib/rbhive/connection.rb, line 75
def set(name,value)
  @logger.info("Setting #{name}=#{value}")
  client.execute("SET #{name}=#{value}")
end

Private Instance Methods

execute_safe(query) click to toggle source
# File lib/rbhive/connection.rb, line 135
def execute_safe(query)
  safe { execute_unsafe(query) }
end
execute_unsafe(query) click to toggle source
# File lib/rbhive/connection.rb, line 139
def execute_unsafe(query)
  @logger.info("Executing Hive Query: #{query}")
  client.execute(query)
end
safe() { || ... } click to toggle source
# File lib/rbhive/connection.rb, line 144
def safe
  ret = nil
  @mutex.synchronize { ret = yield }
  ret
end