class ChefFS::Parallelizer::ParallelizedResults
Public Class Methods
new(enumerator, options, &block)
click to toggle source
# File lib/chef_fs/parallelizer.rb, line 38 def initialize(enumerator, options, &block) @inputs = enumerator.to_a @options = options @block = block @mutex = Mutex.new @outputs = [] @status = [] end
Public Instance Methods
each() { |entry| ... }
click to toggle source
# File lib/chef_fs/parallelizer.rb, line 48 def each next_index = 0 while true # Report any results that already exist while @status.length > next_index && ([:finished, :exception].include?(@status[next_index])) if @status[next_index] == :finished if @options[:flatten] @outputs[next_index].each do |entry| yield entry end else yield @outputs[next_index] end else raise @outputs[next_index] end next_index = next_index + 1 end # Pick up a result and process it, if there is one. This ensures we # move forward even if there are *zero* worker threads available. if !process_input # Exit if we're done. if next_index >= @status.length break else # Ruby 1.8 threading sucks. Wait till we process more things. sleep(0.05) end end end end
process_input()
click to toggle source
# File lib/chef_fs/parallelizer.rb, line 81 def process_input # Grab the next one to process index, input = @mutex.synchronize do index = @status.length if index >= @inputs.length return nil end input = @inputs[index] @status[index] = :started [ index, input ] end begin @outputs[index] = @block.call(input) @status[index] = :finished rescue Exception @outputs[index] = $! @status[index] = :exception end index end