class Pyper::Pipes::Cassandra::ModKeyReader

This pipe is for reading data from sharded rows in Cassandra. The table must have rows sharded by the 'mod_key' field. For a fixed number of such shards, this pipe reads all data from all of those shards, returning a lazy enumerator over all of those rows. For example, if mod_size is 100, it will read the 100 rows with mod_key between 0 and 99.

Attributes

client[R]

@param table [Symbol] the name of the cassandra table to fetch data from @param client [Cassava::Client] @param mod_size [Integer] the mod size @param page_size [Integer] the page size

mod_size[R]

@param table [Symbol] the name of the cassandra table to fetch data from @param client [Cassava::Client] @param mod_size [Integer] the mod size @param page_size [Integer] the page size

page_size[R]

@param table [Symbol] the name of the cassandra table to fetch data from @param client [Cassava::Client] @param mod_size [Integer] the mod size @param page_size [Integer] the page size

table[R]

@param table [Symbol] the name of the cassandra table to fetch data from @param client [Cassava::Client] @param mod_size [Integer] the mod size @param page_size [Integer] the page size

Public Class Methods

new(table, client, mod_size = 100, page_size = 1000) click to toggle source
# File lib/pyper/pipes/cassandra/mod_key_reader.rb, line 14
def initialize(table, client, mod_size = 100, page_size = 1000)
  @table = table
  @client = client
  @mod_size = mod_size
  @page_size = page_size
end

Public Instance Methods

pipe(arguments, status = {}) click to toggle source

@param arguments [Hash] Arguments passed to the cassandra client where statement @param status [Hash] The mutable status field @return [Enumerator::Lazy<Hash>] enumerator of items from all rows

# File lib/pyper/pipes/cassandra/mod_key_reader.rb, line 24
def pipe(arguments, status = {})
  (Enumerator.new do |yielder|
     (0...mod_size).each do |mod_id|
       options = { :page_size => page_size }
       paging_state = nil
       loop do
         options[:paging_state] = paging_state if paging_state.present?
         result = client.select(table).where(arguments.merge(:mod_key => mod_id)).execute(options)
         result.each { |item| yielder << item }

         break if result.last_page?
         paging_state = result.paging_state
       end
     end
   end).lazy
end