module Enumerable
Public Instance Methods
forkoff(options = {})
click to toggle source
# File lib/forkoff.rb, line 144 def forkoff options = {}, &block # options = { 'processes' => Integer(options) } unless Hash === options n = Integer( options['processes'] || options[:processes] || Forkoff.default['processes'] ) strategy = options['strategy'] || options[:strategy] || :pipe strategy_method = Forkoff::STRATEGIES[strategy] q = SizedQueue.new(n) result_sets = Array.new(n){ [] } # consumers # consumers = result_sets.map do |set| Thread.new do Thread.current.abort_on_exception = true loop do arg, index = q.pop break if index.nil? result = Forkoff.send( strategy_method, arg, &block ) set.push [result, index] end set.push( :done ) end end # producers # producer = Thread.new do Thread.current.abort_on_exception = true each_with_index do |arg, i| q.push [arg, i] end n.times do |i| q.push( :done ) end end # wait for all consumers to complete # consumers.each do |t| t.value end # wait for the producer to complete # producer.value # gather results # returned = [] result_sets.each do |set| set.each do |result, index| break if index.nil? returned[index] = result end end returned end
Also aliased as: forkoff!