module Quandl::Cassandra::Batch::Execute::ClassMethods

Public Instance Methods

batch_size() click to toggle source
# File lib/quandl/cassandra/batch/execute.rb, line 16
def batch_size
  Quandl::Cassandra.configuration.batch_size
end
execute(rows, &block) click to toggle source

Quandl::Cassandra::Batch.insert(rows) do |id, type, time, value|

"INSERT INTO columns (id, type, time, value) VALUES (#{id}, '#{type}', #{time}, #{value})"

end

# File lib/quandl/cassandra/batch/execute.rb, line 10
def execute(rows, &block)
  execute_in_batches(rows, &block)
rescue => e
  raise $!, "#{block.call(*rows.first)} #{$!}", $!.backtrace
end

Protected Instance Methods

execute_in_batches(rows, &block) click to toggle source
# File lib/quandl/cassandra/batch/execute.rb, line 31
def execute_in_batches(rows, &block)
  futures = []
  rows.each_slice( batch_size ).each do |rows_slice|
    statements = rows_slice.collect{|row| block.call( *row ) }
    futures << execute_async_batch(statements)
  end
  futures.collect(&:value)
end
execute_in_batches_with_threads(rows, &block) click to toggle source
# File lib/quandl/cassandra/batch/execute.rb, line 23
def execute_in_batches_with_threads(rows, &block)
  # split rows into groups by rows_per_thread
  threads = rows.each_slice( rows_per_thread(rows) ).map do |rows_slice|
    Thread.start{ execute_in_batches(rows_slice, &block) }
  end
  threads.each(&:join)
end
rows_per_thread(rows) click to toggle source
# File lib/quandl/cassandra/batch/execute.rb, line 40
def rows_per_thread(rows)
  r = rows.count / Facter.processorcount.to_i
  r = 1 if r <= 0
  r
end

Private Instance Methods

execute_async_batch(statements) click to toggle source
# File lib/quandl/cassandra/batch/execute.rb, line 49
def execute_async_batch(statements)
  batch = %Q{BEGIN UNLOGGED BATCH\n#{statements.join("\n")}\nAPPLY BATCH;}
  future = Quandl::Cassandra::Base.execute_async( batch )
end