module Quandl::Cassandra::Batch::Execute::ClassMethods
Public Instance Methods
batch_size()
click to toggle source
# File lib/quandl/cassandra/batch/execute.rb, line 16 def batch_size Quandl::Cassandra.configuration.batch_size end
execute(rows, &block)
click to toggle source
Quandl::Cassandra::Batch.insert(rows) do |id, type, time, value|
"INSERT INTO columns (id, type, time, value) VALUES (#{id}, '#{type}', #{time}, #{value})"
end
# File lib/quandl/cassandra/batch/execute.rb, line 10 def execute(rows, &block) execute_in_batches(rows, &block) rescue => e raise $!, "#{block.call(*rows.first)} #{$!}", $!.backtrace end
Protected Instance Methods
execute_in_batches(rows, &block)
click to toggle source
# File lib/quandl/cassandra/batch/execute.rb, line 31 def execute_in_batches(rows, &block) futures = [] rows.each_slice( batch_size ).each do |rows_slice| statements = rows_slice.collect{|row| block.call( *row ) } futures << execute_async_batch(statements) end futures.collect(&:value) end
execute_in_batches_with_threads(rows, &block)
click to toggle source
# File lib/quandl/cassandra/batch/execute.rb, line 23 def execute_in_batches_with_threads(rows, &block) # split rows into groups by rows_per_thread threads = rows.each_slice( rows_per_thread(rows) ).map do |rows_slice| Thread.start{ execute_in_batches(rows_slice, &block) } end threads.each(&:join) end
rows_per_thread(rows)
click to toggle source
# File lib/quandl/cassandra/batch/execute.rb, line 40 def rows_per_thread(rows) r = rows.count / Facter.processorcount.to_i r = 1 if r <= 0 r end
Private Instance Methods
execute_async_batch(statements)
click to toggle source
# File lib/quandl/cassandra/batch/execute.rb, line 49 def execute_async_batch(statements) batch = %Q{BEGIN UNLOGGED BATCH\n#{statements.join("\n")}\nAPPLY BATCH;} future = Quandl::Cassandra::Base.execute_async( batch ) end