class RBHive::Connection
Attributes
client[R]
Public Class Methods
new(server, port=10_000, logger=StdOutLogger.new)
click to toggle source
# File lib/rbhive/connection.rb, line 34 def initialize(server, port=10_000, logger=StdOutLogger.new) @socket = Thrift::Socket.new(server, port) @transport = Thrift::BufferedTransport.new(@socket) @protocol = Thrift::BinaryProtocol.new(@transport) @client = Hive::Thrift::ThriftHive::Client.new(@protocol) @logger = logger @logger.info("Connecting to #{server} on port #{port}") @mutex = Mutex.new end
Public Instance Methods
add_columns(schema)
click to toggle source
# File lib/rbhive/connection.rb, line 125 def add_columns(schema) execute(schema.add_columns_statement) end
close()
click to toggle source
# File lib/rbhive/connection.rb, line 48 def close @transport.close end
create_table(schema)
click to toggle source
# File lib/rbhive/connection.rb, line 112 def create_table(schema) execute(schema.create_table_statement) end
drop_table(name)
click to toggle source
# File lib/rbhive/connection.rb, line 116 def drop_table(name) name = name.name if name.is_a?(TableSchema) execute("DROP TABLE `#{name}`") end
execute(query)
click to toggle source
# File lib/rbhive/connection.rb, line 56 def execute(query) execute_safe(query) end
explain(query)
click to toggle source
# File lib/rbhive/connection.rb, line 60 def explain(query) safe do execute_unsafe("EXPLAIN "+ query) ExplainResult.new(client.fetchAll) end end
fetch(query)
click to toggle source
# File lib/rbhive/connection.rb, line 80 def fetch(query) safe do execute_unsafe(query) rows = client.fetchAll the_schema = SchemaDefinition.new(client.getSchema, rows.first) ResultSet.new(rows, the_schema) end end
fetch_in_batch(query, batch_size=1_000) { |result_set| ... }
click to toggle source
# File lib/rbhive/connection.rb, line 89 def fetch_in_batch(query, batch_size=1_000) safe do execute_unsafe(query) until (next_batch = client.fetchN(batch_size)).empty? the_schema ||= SchemaDefinition.new(client.getSchema, next_batch.first) yield ResultSet.new(next_batch, the_schema) end end end
first(query)
click to toggle source
# File lib/rbhive/connection.rb, line 99 def first(query) safe do execute_unsafe(query) row = client.fetchOne the_schema = SchemaDefinition.new(client.getSchema, row) ResultSet.new([row], the_schema).first end end
method_missing(meth, *args)
click to toggle source
# File lib/rbhive/connection.rb, line 129 def method_missing(meth, *args) client.send(meth, *args) end
open()
click to toggle source
# File lib/rbhive/connection.rb, line 44 def open @transport.open end
priority=(priority)
click to toggle source
# File lib/rbhive/connection.rb, line 67 def priority=(priority) set("mapred.job.priority", priority) end
queue=(queue)
click to toggle source
# File lib/rbhive/connection.rb, line 71 def queue=(queue) set("mapred.job.queue.name", queue) end
replace_columns(schema)
click to toggle source
# File lib/rbhive/connection.rb, line 121 def replace_columns(schema) execute(schema.replace_columns_statement) end
schema(example_row=[])
click to toggle source
# File lib/rbhive/connection.rb, line 108 def schema(example_row=[]) safe { SchemaDefinition.new(client.getSchema, example_row) } end
set(name,value)
click to toggle source
# File lib/rbhive/connection.rb, line 75 def set(name,value) @logger.info("Setting #{name}=#{value}") client.execute("SET #{name}=#{value}") end
Private Instance Methods
execute_safe(query)
click to toggle source
# File lib/rbhive/connection.rb, line 135 def execute_safe(query) safe { execute_unsafe(query) } end
execute_unsafe(query)
click to toggle source
# File lib/rbhive/connection.rb, line 139 def execute_unsafe(query) @logger.info("Executing Hive Query: #{query}") client.execute(query) end
safe() { || ... }
click to toggle source
# File lib/rbhive/connection.rb, line 144 def safe ret = nil @mutex.synchronize { ret = yield } ret end