class Spark::ExternalSorter
Constants
- EVAL_N_VALUES
How many values will be taken from each enumerator.
- KEY_FUNCTION
Default key function
- MAX_SLICE_SIZE
Maximum of slicing. Memory control can be avoided by large value.
- MEMORY_FREE_CHUNK
How big will be chunk for adding new memory because GC not cleaning immediately un-referenced variables
- MEMORY_RESERVE
Items from GC cannot be destroyed so
make_parts
need some reserve- START_SLICE_SIZE
How many items will be evaluate from iterator at start
Attributes
memory_chunk[R]
memory_limit[R]
serializer[R]
total_memory[R]
Public Class Methods
new(total_memory, serializer)
click to toggle source
# File lib/spark/sort.rb, line 61 def initialize(total_memory, serializer) @total_memory = total_memory @memory_limit = total_memory * (100-MEMORY_RESERVE) / 100 @memory_chunk = total_memory * (100-MEMORY_FREE_CHUNK) / 100 @serializer = serializer end
Public Instance Methods
add_memory!()
click to toggle source
# File lib/spark/sort.rb, line 68 def add_memory! @memory_limit += memory_chunk end
sort_by(iterator, ascending=true, key_function=KEY_FUNCTION) { |item| ... }
click to toggle source
# File lib/spark/sort.rb, line 72 def sort_by(iterator, ascending=true, key_function=KEY_FUNCTION) return to_enum(__callee__, iterator, key_function) unless block_given? create_temp_folder internal_sorter = Spark::InternalSorter.get(ascending, key_function) # Make N sorted enumerators parts = make_parts(iterator, internal_sorter) return [] if parts.empty? # Need new key function because items have new structure # From: [1,2,3] to [[1, Enumerator],[2, Enumerator],[3, Enumerator]] key_function_with_enum = lambda{|(key, _)| key_function[key]} internal_sorter = Spark::InternalSorter.get(ascending, key_function_with_enum) heap = [] enums = [] # Load first items to heap parts.each do |part| EVAL_N_VALUES.times { begin heap << [part.next, part] rescue StopIteration break end } end # Parts can be empty but heap not while parts.any? || heap.any? internal_sorter.sort(heap) # Since parts are sorted and heap contains EVAL_N_VALUES method # can add EVAL_N_VALUES items to the result EVAL_N_VALUES.times { break if heap.empty? item, enum = heap.shift enums << enum yield item } # Add new element to heap from part of which was result item while (enum = enums.shift) begin heap << [enum.next, enum] rescue StopIteration parts.delete(enum) enums.delete(enum) end end end ensure destroy_temp_folder end
Private Instance Methods
create_temp_folder()
click to toggle source
# File lib/spark/sort.rb, line 134 def create_temp_folder @dir = Dir.mktmpdir end
destroy_temp_folder()
click to toggle source
# File lib/spark/sort.rb, line 138 def destroy_temp_folder FileUtils.remove_entry_secure(@dir) if @dir end
make_parts(iterator, internal_sorter)
click to toggle source
New part is created when current part exceeds memory limit (is variable) Every new part have more memory because of ruby GC
# File lib/spark/sort.rb, line 144 def make_parts(iterator, internal_sorter) slice = START_SLICE_SIZE parts = [] part = [] loop do begin # Enumerator does not have slice method slice.times { part << iterator.next } rescue StopIteration break end # Carefully memory_limit is variable if memory_usage > memory_limit # Sort current part with origin key_function internal_sorter.sort(part) # Tempfile for current part # will be destroyed on #destroy_temp_folder file = Tempfile.new("part", @dir) serializer.dump(part, file) # Peek is at the end of file file.seek(0) parts << serializer.load(file) # Some memory will be released but not immediately # need some new memory for start part.clear add_memory! else slice = [slice*2, MAX_SLICE_SIZE].min end end # Last part which is not in the file if part.any? internal_sorter.sort(part) parts << part.each end parts end