module MapReduce::Mergeable

The MapReduce::Mergeable mixin provides the k-way-merge operation used by mappers as well as reducers.

Private Instance Methods

k_way_merge(files) { |key, value| ... } click to toggle source

Performs the k-way-merge of the passed files using a priority queue using a binomial heap. The content of the passed files needs to be sorted. It starts by reading one item of each file and adding it to the priority queue. Afterwards, it continously pops an item from the queue, yields it and reads a new item from the file the popped item belongs to, adding the read item to the queue. This continues up until all items from the files have been read. This guarantees that the yielded key-value pairs are sorted without having all items in-memory.

@param files [IO, Tempfile] The files to run the k-way-merge for. The

content of the files must be sorted.
# File lib/map_reduce/mergeable.rb, line 20
def k_way_merge(files)
  return enum_for(:k_way_merge, files) unless block_given?

  queue = PriorityQueue.new

  files.each_with_index do |file, index|
    line = file.eof? ? nil : file.readline

    next unless line

    key, value = JSON.parse(line)

    queue.push([key, value, index], JSON.generate(key))
  end

  loop do
    key, value, index = queue.pop

    return unless index

    yield([key, value])

    line = files[index].yield_self { |file| file.eof? ? nil : file.readline }

    next unless line

    key, value = JSON.parse(line)

    queue.push([key, value, index], JSON.generate(key))
  end

  files.each(&:rewind)

  nil
end