class Pyper::Pipes::Cassandra::ModKeyReader
This pipe is for reading data from sharded rows in Cassandra
. The table must have rows sharded by the 'mod_key' field. For a fixed number of such shards, this pipe reads all data from all of those shards, returning a lazy enumerator over all of those rows. For example, if mod_size
is 100, it will read the 100 rows with mod_key between 0 and 99.
Attributes
client[R]
mod_size[R]
page_size[R]
Public Class Methods
new(table, client, mod_size = 100, page_size = 1000)
click to toggle source
# File lib/pyper/pipes/cassandra/mod_key_reader.rb, line 14 def initialize(table, client, mod_size = 100, page_size = 1000) @table = table @client = client @mod_size = mod_size @page_size = page_size end
Public Instance Methods
pipe(arguments, status = {})
click to toggle source
@param arguments [Hash] Arguments passed to the cassandra client where statement @param status [Hash] The mutable status field @return [Enumerator::Lazy<Hash>] enumerator of items from all rows
# File lib/pyper/pipes/cassandra/mod_key_reader.rb, line 24 def pipe(arguments, status = {}) (Enumerator.new do |yielder| (0...mod_size).each do |mod_id| options = { :page_size => page_size } paging_state = nil loop do options[:paging_state] = paging_state if paging_state.present? result = client.select(table).where(arguments.merge(:mod_key => mod_id)).execute(options) result.each { |item| yielder << item } break if result.last_page? paging_state = result.paging_state end end end).lazy end