class MapReduce::Reducer

The MapReduce::Reducer class runs the reducer part of your map-reduce job.

Public Class Methods

new(implementation) click to toggle source

Initializes a new reducer.

@param implementation Your map-reduce implementation, i.e. an object

which responds to #map and #reduce.

@example

MapReduce::Reducer.new(MyImplementation.new)
Calls superclass method
# File lib/map_reduce/reducer.rb, line 19
def initialize(implementation)
  super()

  @implementation = implementation

  @temp_paths ||= []
end

Public Instance Methods

add_chunk() click to toggle source

Adds a chunk from the mapper-phase to the reducer by registering a tempfile and returning the path to that tempfile, such that you can download a chunk e.g. from s3 and write the content to this tempfile.

@returns [String] The path to a tempfile.

@example

chunk_path = reducer.add_chunk
File.write(chunk_path, "downloaded blob")
# File lib/map_reduce/reducer.rb, line 37
def add_chunk
  temp_path = TempPath.new

  synchronize do
    @temp_paths.push(temp_path)
  end

  temp_path.path
end
reduce(chunk_limit:, &block) click to toggle source

Performs a k-way-merge of the added chunks and yields the reduced key-value pairs. It performs multiple runs when more than `chunk_limit` chunks exist. A run means: it takes up to `chunk_limit` chunks, reduces them and pushes the result as a new chunk. At the end it removes all tempfiles, even if errors occur.

@param chunk_limit [Integer] The maximum number of files to process

during a single run. Most useful when you run on a system where the
number of open file descriptors is limited. If your number of file
descriptors is unlimited, you want to set it to a higher number to
avoid the overhead of multiple runs.

@example

reducer = MapReduce::Reducer.new(MyImplementation.new)

chunk1_path = reducer.add_chunk
# write data to the file

chunk2_path = reducer.add_chunk
# write data to the file

reducer.reduce(chunk_limit: 32) do |key, value|
  # ...
end
# File lib/map_reduce/reducer.rb, line 72
def reduce(chunk_limit:, &block)
  return enum_for(:reduce, chunk_limit: chunk_limit) unless block_given?

  raise(InvalidChunkLimit, "Chunk limit must be >= 2") unless chunk_limit >= 2

  begin
    loop do
      slice = @temp_paths.shift(chunk_limit)
      files = slice.select { |temp_path| File.exist?(temp_path.path) }
                   .map { |temp_path| File.open(temp_path.path, "r") }

      begin
        if @temp_paths.empty?
          reduce_chunk(k_way_merge(files), @implementation).each do |pair|
            block.call(pair)
          end

          return
        end

        File.open(add_chunk, "w") do |file|
          reduce_chunk(k_way_merge(files), @implementation).each do |pair|
            file.puts JSON.generate(pair)
          end
        end
      ensure
        files.each(&:close)
        slice.each(&:delete)
      end
    end
  ensure
    @temp_paths.each(&:delete)
  end
end