class Parallel::JobFactory

Public Class Methods

new(source, mutex) click to toggle source
# File lib/parallel.rb, line 99
def initialize(source, mutex)
  @lambda = (source.respond_to?(:call) && source) || queue_wrapper(source)
  @source = source.to_a unless @lambda # turn Range and other Enumerable-s into an Array
  @mutex = mutex
  @index = -1
  @stopped = false
end

Public Instance Methods

next() click to toggle source
# File lib/parallel.rb, line 107
def next
  if producer?
    # - index and item stay in sync
    # - do not call lambda after it has returned Stop
    item, index = @mutex.synchronize do
      return if @stopped
      item = @lambda.call
      @stopped = (item == Stop)
      return if @stopped
      [item, @index += 1]
    end
  else
    index = @mutex.synchronize { @index += 1 }
    return if index >= size
    item = @source[index]
  end
  [item, index]
end
pack(item, index) click to toggle source

generate item that is sent to workers just index is faster + less likely to blow up with unserializable errors

# File lib/parallel.rb, line 136
def pack(item, index)
  producer? ? [item, index] : index
end
size() click to toggle source
# File lib/parallel.rb, line 126
def size
  if producer?
    Float::INFINITY
  else
    @source.size
  end
end
unpack(data) click to toggle source

unpack item that is sent to workers

# File lib/parallel.rb, line 141
def unpack(data)
  producer? ? data : [@source[data], data]
end

Private Instance Methods

producer?() click to toggle source
# File lib/parallel.rb, line 147
def producer?
  @lambda
end
queue_wrapper(array) click to toggle source
# File lib/parallel.rb, line 151
def queue_wrapper(array)
  array.respond_to?(:num_waiting) && array.respond_to?(:pop) && -> { array.pop(false) }
end