module Enumerable

Public Instance Methods

forkoff(options = {}) click to toggle source
# File lib/forkoff.rb, line 144
def forkoff options = {}, &block
#
  options = { 'processes' => Integer(options) } unless Hash === options
  n = Integer( options['processes'] || options[:processes] || Forkoff.default['processes'] )
  strategy = options['strategy'] || options[:strategy] || :pipe
  strategy_method = Forkoff::STRATEGIES[strategy]
  q = SizedQueue.new(n)
  result_sets = Array.new(n){ [] }

# consumers
#
  consumers =
    result_sets.map do |set|
      Thread.new do
        Thread.current.abort_on_exception = true

        loop do
          arg, index = q.pop
          break if index.nil?
          result = Forkoff.send( strategy_method, arg, &block )

          set.push [result, index]
        end

        set.push( :done )
      end
    end

# producers
#
  producer = 
    Thread.new do
      Thread.current.abort_on_exception = true
      each_with_index do |arg, i|
        q.push [arg, i]
      end
      n.times do |i|
        q.push( :done )
      end
    end

# wait for all consumers to complete
#
  consumers.each do |t|
    t.value
  end

# wait for the producer to complete
#
  producer.value

# gather results
#
  returned = []

  result_sets.each do |set|
    set.each do |result, index|
      break if index.nil?
      returned[index] = result
    end
  end

  returned
end
Also aliased as: forkoff!
forkoff!(options = {})
Alias for: forkoff