class ChefFS::Parallelizer::ParallelizedResults

Public Class Methods

new(enumerator, options, &block) click to toggle source
# File lib/chef_fs/parallelizer.rb, line 38
def initialize(enumerator, options, &block)
  @inputs = enumerator.to_a
  @options = options
  @block = block

  @mutex = Mutex.new
  @outputs = []
  @status = []
end

Public Instance Methods

each() { |entry| ... } click to toggle source
# File lib/chef_fs/parallelizer.rb, line 48
def each
  next_index = 0
  while true
    # Report any results that already exist
    while @status.length > next_index && ([:finished, :exception].include?(@status[next_index]))
      if @status[next_index] == :finished
        if @options[:flatten]
          @outputs[next_index].each do |entry|
            yield entry
          end
        else
          yield @outputs[next_index]
        end
      else
        raise @outputs[next_index]
      end
      next_index = next_index + 1
    end

    # Pick up a result and process it, if there is one.  This ensures we
    # move forward even if there are *zero* worker threads available.
    if !process_input
      # Exit if we're done.
      if next_index >= @status.length
        break
      else
        # Ruby 1.8 threading sucks.  Wait till we process more things.
        sleep(0.05)
      end
    end
  end
end
process_input() click to toggle source
# File lib/chef_fs/parallelizer.rb, line 81
def process_input
  # Grab the next one to process
  index, input = @mutex.synchronize do
    index = @status.length
    if index >= @inputs.length
      return nil
    end
    input = @inputs[index]
    @status[index] = :started
    [ index, input ]
  end

  begin
    @outputs[index] = @block.call(input)
    @status[index] = :finished
  rescue Exception
    @outputs[index] = $!
    @status[index] = :exception
  end
  index
end