class Spark::ExternalSorter

Constants

EVAL_N_VALUES

How many values will be taken from each enumerator.

KEY_FUNCTION

Default key function

MAX_SLICE_SIZE

Maximum of slicing. Memory control can be avoided by large value.

MEMORY_FREE_CHUNK

How big will be chunk for adding new memory because GC not cleaning immediately un-referenced variables

MEMORY_RESERVE

Items from GC cannot be destroyed so make_parts need some reserve

START_SLICE_SIZE

How many items will be evaluate from iterator at start

Attributes

memory_chunk[R]
memory_limit[R]
serializer[R]
total_memory[R]

Public Class Methods

new(total_memory, serializer) click to toggle source
# File lib/spark/sort.rb, line 61
def initialize(total_memory, serializer)
  @total_memory = total_memory
  @memory_limit = total_memory * (100-MEMORY_RESERVE)    / 100
  @memory_chunk = total_memory * (100-MEMORY_FREE_CHUNK) / 100
  @serializer   = serializer
end

Public Instance Methods

add_memory!() click to toggle source
# File lib/spark/sort.rb, line 68
def add_memory!
  @memory_limit += memory_chunk
end
sort_by(iterator, ascending=true, key_function=KEY_FUNCTION) { |item| ... } click to toggle source
# File lib/spark/sort.rb, line 72
def sort_by(iterator, ascending=true, key_function=KEY_FUNCTION)
  return to_enum(__callee__, iterator, key_function) unless block_given?

  create_temp_folder
  internal_sorter = Spark::InternalSorter.get(ascending, key_function)

  # Make N sorted enumerators
  parts = make_parts(iterator, internal_sorter)

  return [] if parts.empty?

  # Need new key function because items have new structure
  # From: [1,2,3] to [[1, Enumerator],[2, Enumerator],[3, Enumerator]]
  key_function_with_enum = lambda{|(key, _)| key_function[key]}
  internal_sorter = Spark::InternalSorter.get(ascending, key_function_with_enum)

  heap  = []
  enums = []

  # Load first items to heap
  parts.each do |part|
    EVAL_N_VALUES.times {
      begin
        heap << [part.next, part]
      rescue StopIteration
        break
      end
    }
  end

  # Parts can be empty but heap not
  while parts.any? || heap.any?
    internal_sorter.sort(heap)

    # Since parts are sorted and heap contains EVAL_N_VALUES method
    # can add EVAL_N_VALUES items to the result
    EVAL_N_VALUES.times {
      break if heap.empty?

      item, enum = heap.shift
      enums << enum

      yield item
    }

    # Add new element to heap from part of which was result item
    while (enum = enums.shift)
      begin
        heap << [enum.next, enum]
      rescue StopIteration
        parts.delete(enum)
        enums.delete(enum)
      end
    end
  end

ensure
  destroy_temp_folder
end

Private Instance Methods

create_temp_folder() click to toggle source
# File lib/spark/sort.rb, line 134
def create_temp_folder
  @dir = Dir.mktmpdir
end
destroy_temp_folder() click to toggle source
# File lib/spark/sort.rb, line 138
def destroy_temp_folder
  FileUtils.remove_entry_secure(@dir) if @dir
end
make_parts(iterator, internal_sorter) click to toggle source

New part is created when current part exceeds memory limit (is variable) Every new part have more memory because of ruby GC

# File lib/spark/sort.rb, line 144
def make_parts(iterator, internal_sorter)
  slice = START_SLICE_SIZE

  parts = []
  part  = []

  loop do
    begin
      # Enumerator does not have slice method
      slice.times { part << iterator.next }
    rescue StopIteration
      break
    end

    # Carefully memory_limit is variable
    if memory_usage > memory_limit
      # Sort current part with origin key_function
      internal_sorter.sort(part)
      # Tempfile for current part
      # will be destroyed on #destroy_temp_folder
      file = Tempfile.new("part", @dir)
      serializer.dump(part, file)
      # Peek is at the end of file
      file.seek(0)
      parts << serializer.load(file)

      # Some memory will be released but not immediately
      # need some new memory for start
      part.clear
      add_memory!
    else
      slice = [slice*2, MAX_SLICE_SIZE].min
    end
  end

  # Last part which is not in the file
  if part.any?
    internal_sorter.sort(part)
    parts << part.each
  end

  parts
end