module Cassie::Statements::Execution::BatchedFetching

Public Instance Methods

fetch_each(opts={}) { |record| ... } click to toggle source

Uses fetch_in_batches with a batch size of 1000 (or as specified by the :batch_size option) to enumerate through all records, while using batches to limit resource consumption.

If you do not provide a block to fetch_each, it will return an Enumerator for chaining with other methods.

UsersByPositionQuery.fetch_each.with_index do |user, index|
  user.position = index
  UserMapper.update_position(user)
end

Options

  • :batch_size - Specifies the size of the batch. Default to 1000.

NOTE: Any limit specified on the query will affect the batched set. Cassandra internal paging is used for batching.

# File lib/cassie/statements/execution/batched_fetching.rb, line 27
def fetch_each(opts={})
  return to_enum(:fetch_each, opts) unless block_given?

  fetch_in_batches(opts) do |records|
    records.each { |record| yield record }
  end
end
fetch_in_batches(opts={}) { |batch| ... } click to toggle source

Yields each batch of records that was found by the options as an array.

If you do not provide a block to find_in_batches, it will return an Enumerator for chaining with other methods.

query.fetch_in_batches do |records|
 puts "max score in group: #{records.max{ |a, b| a.score <=> b.score }}"
end

"max score in group: 26"

Options

  • :batch_size - Specifies the size of the batch. Default to 1000.

NOTE: Any limit specified on the query will affect the batched set. Cassandra internal paging is used for batching.

# File lib/cassie/statements/execution/batched_fetching.rb, line 50
def fetch_in_batches(opts={})
  opts[:batch_size] ||= 1000

  # spawn the new query as soon as the enumerable is created
  # rather than waiting until the firt iteration is executed.
  # The client could mutate the object between these moments,
  # however we don't want to spawn twice if a block isn't passed.
  paged_query = opts.delete(:_paged_query) || self.clone

  return to_enum(:fetch_in_batches, opts.merge(_paged_query: paged_query)) unless block_given?

  # use Cassandra internal paging
  # but clone the query to isolate it
  # and allow all paging queries
  # to execute within a Cassie::Query
  # for use of other features, like logging
  #
  # note: stateless page size is independent from limit
  paged_query.stateless_page_size = opts[:batch_size]
  paged_query.paging_state = nil

  loop do
    # done if the previous result was the last page
    break if paged_query.result && paged_query.result.last_page?
    raise page_size_changed_error(opts[:batch_size]) if opts[:batch_size] != paged_query.stateless_page_size

    batch = paged_query.fetch
    paged_query.paging_state = paged_query.result.paging_state

    yield batch
  end
end

Private Instance Methods

page_size_changed_error(original_size) click to toggle source
# File lib/cassie/statements/execution/batched_fetching.rb, line 85
def page_size_changed_error(original_size)
  Cassie::Statements::Statement::Invalid.new("Page size is no longer valid. It was #{original_size} when the batch was started, and is now #{self.page_size}. Continuing would cause unexpected results.")
end