class Pyper::Pipes::Cassandra::AllItemsReader

A pipe for reading all items from a single row in cassandra @param [Symbol] table name @param [Cassava::Client] client to query cassandra with

Attributes

page_size[R]

Public Class Methods

new(table, client, page_size = 1000) click to toggle source

@param table [Symbol] the name of the cassandra table to fetch data from @param client [Cassava::Client] @param mod_size [Integer] the mod size @param page_size [Integer] the page size

# File lib/pyper/pipes/cassandra/all_items_reader.rb, line 12
def initialize(table, client, page_size = 1000)
  @table = table
  @client = client
  @page_size = page_size
end

Public Instance Methods

pipe(arguments, status = {}) click to toggle source

@param arguments [Hash] Arguments passed to the cassandra client where statement @option arguments [Array] :order A pair [clustering_column, :desc|:asc] determining how to order the results. @option arguments [Integer] :page_size @param status [Hash] The mutable status field @return [Enumerator::Lazy<Hash>] enumerator of items

# File lib/pyper/pipes/cassandra/all_items_reader.rb, line 23
def pipe(arguments, status = {})
  columns = arguments.delete(:columns)
  enum = Enumerator.new do |yielder|
    options = { :page_size => page_size }
    paging_state = nil
    loop do
      options[:paging_state] = paging_state if paging_state.present?
      result = @client.select(@table, columns).where(arguments).execute(options)
      result.each { |item| yielder << item }

      break if result.last_page?
      paging_state = result.paging_state
    end
  end
  enum.lazy
end