module Parallel

Constants

Stop
VERSION

Public Class Methods

all?(*args, &block) click to toggle source
# File lib/parallel.rb, line 243
def all?(*args, &block)
  raise "You must provide a block when calling #all?" if block.nil?
  !!each(*args) { |*a| raise Kill unless block.call(*a) }
end
any?(*args, &block) click to toggle source
# File lib/parallel.rb, line 238
def any?(*args, &block)
  raise "You must provide a block when calling #any?" if block.nil?
  !each(*args) { |*a| raise Kill if block.call(*a) }
end
each(array, options = {}, &block) click to toggle source
# File lib/parallel.rb, line 234
def each(array, options = {}, &block)
  map(array, options.merge(preserve_results: false), &block)
end
each_with_index(array, options = {}, &block) click to toggle source
# File lib/parallel.rb, line 248
def each_with_index(array, options = {}, &block)
  each(array, options.merge(with_index: true), &block)
end
filter_map(...) click to toggle source
# File lib/parallel.rb, line 307
def filter_map(...)
  map(...).compact
end
flat_map(...) click to toggle source
# File lib/parallel.rb, line 303
def flat_map(...)
  map(...).flatten(1)
end
in_processes(options = {}, &block) click to toggle source
# File lib/parallel.rb, line 228
def in_processes(options = {}, &block)
  count, options = extract_count_from_options(options)
  count ||= processor_count
  map(0...count, options.merge(in_processes: count), &block)
end
in_threads(options = { count: 2 }) { |i| ... } click to toggle source
# File lib/parallel.rb, line 212
def in_threads(options = { count: 2 })
  threads = []
  count, = extract_count_from_options(options)

  Thread.handle_interrupt(Exception => :never) do
    Thread.handle_interrupt(Exception => :immediate) do
      count.times do |i|
        threads << Thread.new { yield(i) }
      end
      threads.map(&:value)
    end
  ensure
    threads.each(&:kill)
  end
end
map(source, options = {}, &block) click to toggle source
# File lib/parallel.rb, line 252
def map(source, options = {}, &block)
  options = options.dup
  options[:mutex] = Mutex.new

  if options[:in_processes] && options[:in_threads]
    raise ArgumentError, "Please specify only one of `in_processes` or `in_threads`."
  elsif RUBY_PLATFORM =~ (/java/) && !(options[:in_processes])
    method = :in_threads
    size = options[method] || processor_count
  elsif options[:in_threads]
    method = :in_threads
    size = options[method]
  elsif options[:in_ractors]
    method = :in_ractors
    size = options[method]
  else
    method = :in_processes
    if Process.respond_to?(:fork)
      size = options[method] || processor_count
    else
      warn "Process.fork is not supported by this Ruby"
      size = 0
    end
  end

  job_factory = JobFactory.new(source, options[:mutex])
  size = [job_factory.size, size].min

  options[:return_results] = (options[:preserve_results] != false || !!options[:finish])
  add_progress_bar!(job_factory, options)

  result =
    if size == 0
      work_direct(job_factory, options, &block)
    elsif method == :in_threads
      work_in_threads(job_factory, options.merge(count: size), &block)
    elsif method == :in_ractors
      work_in_ractors(job_factory, options.merge(count: size), &block)
    else
      work_in_processes(job_factory, options.merge(count: size), &block)
    end

  return result.value if result.is_a?(Break)
  raise result if result.is_a?(Exception)
  options[:return_results] ? result : source
end
map_with_index(array, options = {}, &block) click to toggle source
# File lib/parallel.rb, line 299
def map_with_index(array, options = {}, &block)
  map(array, options.merge(with_index: true), &block)
end
physical_processor_count() click to toggle source

Number of physical processor cores on the current system.

# File lib/parallel.rb, line 312
def physical_processor_count
  @physical_processor_count ||= begin
    ppc =
      case RbConfig::CONFIG["target_os"]
      when /darwin[12]/
        IO.popen("/usr/sbin/sysctl -n hw.physicalcpu").read.to_i
      when /linux/
        cores = {} # unique physical ID / core ID combinations
        phy = 0
        File.read("/proc/cpuinfo").scan(/^physical id.*|^core id.*/) do |ln|
          if ln.start_with?("physical")
            phy = ln[/\d+/]
          elsif ln.start_with?("core")
            cid = "#{phy}:#{ln[/\d+/]}"
            cores[cid] = true unless cores[cid]
          end
        end
        cores.count
      when /mswin|mingw/
        physical_processor_count_windows
      else
        processor_count
      end
    # fall back to logical count if physical info is invalid
    ppc > 0 ? ppc : processor_count
  end
end
processor_count() click to toggle source

Number of processors seen by the OS or value considering CPU quota if the process is inside a cgroup, used for process scheduling

# File lib/parallel.rb, line 342
def processor_count
  @processor_count ||= Integer(ENV['PARALLEL_PROCESSOR_COUNT'] || available_processor_count)
end
worker_number() click to toggle source
# File lib/parallel.rb, line 346
def worker_number
  Thread.current[:parallel_worker_number]
end
worker_number=(worker_num) click to toggle source

TODO: this does not work when doing threads in forks, so should remove and yield the number instead if needed

# File lib/parallel.rb, line 351
def worker_number=(worker_num)
  Thread.current[:parallel_worker_number] = worker_num
end

Private Class Methods

add_progress_bar!(job_factory, options) click to toggle source
# File lib/parallel.rb, line 384
def add_progress_bar!(job_factory, options)
  if (progress_options = options[:progress])
    raise "Progressbar can only be used with array like items" if job_factory.size == Float::INFINITY
    require 'ruby-progressbar'

    if progress_options == true
      progress_options = { title: "Progress" }
    elsif progress_options.respond_to? :to_str
      progress_options = { title: progress_options.to_str }
    end

    progress_options = {
      total: job_factory.size,
      format: '%t |%E | %B | %a'
    }.merge(progress_options)

    progress = ProgressBar.create(progress_options)
    old_finish = options[:finish]
    options[:finish] = lambda do |item, i, result|
      old_finish.call(item, i, result) if old_finish
      progress.increment
    end
  end
end
available_processor_count() click to toggle source
# File lib/parallel.rb, line 699
def available_processor_count
  gem 'concurrent-ruby', '>= 1.3.4'
  require 'concurrent-ruby'
  Concurrent.available_processor_count.floor
rescue LoadError
  require 'etc'
  Etc.nprocessors
end
call_with_index(item, index, options, &block) click to toggle source
# File lib/parallel.rb, line 647
def call_with_index(item, index, options, &block)
  args = [item]
  args << index if options[:with_index]
  results = block.call(*args)
  if options[:return_results]
    results
  else
    nil # avoid GC overhead of passing large results around
  end
end
create_workers(job_factory, options, &block) click to toggle source
# File lib/parallel.rb, line 579
def create_workers(job_factory, options, &block)
  workers = []
  Array.new(options[:count]).each_with_index do |_, i|
    workers << worker(job_factory, options.merge(started_workers: workers, worker_number: i), &block)
  end
  workers
end
extract_count_from_options(options) click to toggle source

options is either a Integer or a Hash with :count

# File lib/parallel.rb, line 637
def extract_count_from_options(options)
  if options.is_a?(Hash)
    count = options[:count]
  else
    count = options
    options = {}
  end
  [count, options]
end
instrument_finish(item, index, result, options) click to toggle source
# File lib/parallel.rb, line 665
def instrument_finish(item, index, result, options)
  return unless (on_finish = options[:finish])
  return instrument_finish_in_order(item, index, result, options) if options[:finish_in_order]
  options[:mutex].synchronize { on_finish.call(item, index, result) }
end
instrument_finish_in_order(item, index, result, options) click to toggle source

yield results in the order of the input items needs to use `options` to store state between executions needs to use `done` index since a nil result would also be valid

# File lib/parallel.rb, line 674
def instrument_finish_in_order(item, index, result, options)
  options[:mutex].synchronize do
    # initialize our state
    options[:finish_done] ||= []
    options[:finish_expecting] ||= 0 # we wait for item at index 0

    # store current result
    options[:finish_done][index] = [item, result]

    # yield all results that are now in order
    break unless index == options[:finish_expecting]
    index.upto(options[:finish_done].size).each do |i|
      break unless (done = options[:finish_done][i])
      options[:finish_done][i] = nil # allow GC to free this item and result
      options[:finish].call(done[0], i, done[1])
      options[:finish_expecting] += 1
    end
  end
end
instrument_start(item, index, options) click to toggle source
# File lib/parallel.rb, line 694
def instrument_start(item, index, options)
  return unless (on_start = options[:start])
  options[:mutex].synchronize { on_start.call(item, index) }
end
physical_processor_count_windows() click to toggle source
# File lib/parallel.rb, line 357
def physical_processor_count_windows
  # Get-CimInstance introduced in PowerShell 3 or earlier: https://learn.microsoft.com/en-us/previous-versions/powershell/module/cimcmdlets/get-ciminstance?view=powershell-3.0
  result = run(
    'powershell -command "Get-CimInstance -ClassName Win32_Processor -Property NumberOfCores ' \
    '| Select-Object -Property NumberOfCores"'
  )
  if !result || $?.exitstatus != 0
    # fallback to deprecated wmic for older systems
    result = run("wmic cpu get NumberOfCores")
  end
  if !result || $?.exitstatus != 0
    # Bail out if both commands returned something unexpected
    warn "guessing pyhsical processor count"
    processor_count
  else
    # powershell: "\nNumberOfCores\n-------------\n            4\n\n\n"
    # wmic:       "NumberOfCores  \n\n4              \n\n\n\n"
    result.scan(/\d+/).map(&:to_i).reduce(:+)
  end
end
process_incoming_jobs(read, write, job_factory, options, &block) click to toggle source
# File lib/parallel.rb, line 613
def process_incoming_jobs(read, write, job_factory, options, &block)
  until read.eof?
    data = Marshal.load(read)
    item, index = job_factory.unpack(data)

    result =
      begin
        call_with_index(item, index, options, &block)
      # https://github.com/rspec/rspec-support/blob/673133cdd13b17077b3d88ece8d7380821f8d7dc/lib/rspec/support.rb#L132-L140
      rescue NoMemoryError, SignalException, Interrupt, SystemExit # rubocop:disable Lint/ShadowedException
        raise $!
      rescue Exception # # rubocop:disable Lint/RescueException
        ExceptionWrapper.new($!)
      end

    begin
      Marshal.dump(result, write)
    rescue Errno::EPIPE
      return # parent thread already dead
    end
  end
end
replace_worker(job_factory, workers, index, options, blk) click to toggle source
# File lib/parallel.rb, line 567
def replace_worker(job_factory, workers, index, options, blk)
  options[:mutex].synchronize do
    # old worker is no longer used ... stop it
    worker = workers[index]
    worker.stop

    # create a new replacement worker
    running = workers - [worker]
    workers[index] = worker(job_factory, options.merge(started_workers: running, worker_number: index), &blk)
  end
end
run(command) click to toggle source
# File lib/parallel.rb, line 378
def run(command)
  IO.popen(command, &:read)
rescue Errno::ENOENT
  # Ignore
end
with_instrumentation(item, index, options) { || ... } click to toggle source
# File lib/parallel.rb, line 658
def with_instrumentation(item, index, options)
  instrument_start(item, index, options)
  result = yield
  instrument_finish(item, index, result, options)
  result unless options[:preserve_results] == false
end
work_direct(job_factory, options, &block) click to toggle source
# File lib/parallel.rb, line 409
def work_direct(job_factory, options, &block)
  self.worker_number = 0
  results = []
  exception = nil
  begin
    while (set = job_factory.next)
      item, index = set
      results << with_instrumentation(item, index, options) do
        call_with_index(item, index, options, &block)
      end
    end
  rescue StandardError
    exception = $!
  end
  exception || results
ensure
  self.worker_number = nil
end
work_in_processes(job_factory, options, &blk) click to toggle source
# File lib/parallel.rb, line 519
def work_in_processes(job_factory, options, &blk)
  workers = create_workers(job_factory, options, &blk)
  results = []
  results_mutex = Mutex.new # arrays are not thread-safe
  exception = nil

  UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do
    in_threads(options) do |i|
      worker = workers[i]
      worker.thread = Thread.current
      worked = false

      begin
        loop do
          break if exception
          item, index = job_factory.next
          break unless index

          if options[:isolation]
            worker = replace_worker(job_factory, workers, i, options, blk) if worked
            worked = true
            worker.thread = Thread.current
          end

          begin
            result = with_instrumentation item, index, options do
              worker.work(job_factory.pack(item, index))
            end
            results_mutex.synchronize { results[index] = result } # arrays are not threads safe on jRuby
          rescue StandardError => e
            exception = e
            if exception.is_a?(Kill)
              (workers - [worker]).each do |w|
                w.thread&.kill
                UserInterruptHandler.kill(w.pid)
              end
            end
          end
        end
      ensure
        worker.stop
      end
    end
  end

  exception || results
end
work_in_ractors(job_factory, options) click to toggle source
# File lib/parallel.rb, line 453
def work_in_ractors(job_factory, options)
  exception = nil
  results = []
  results_mutex = Mutex.new # arrays are not thread-safe on jRuby

  callback = options[:ractor]
  if block_given? || !callback
    raise ArgumentError, "pass the code you want to execute as `ractor: [ClassName, :method_name]`"
  end

  # build
  ractors = Array.new(options.fetch(:count)) do
    Ractor.new do
      loop do
        got = receive
        (klass, method_name), item, index = got
        break if index == :break
        begin
          Ractor.yield [nil, klass.send(method_name, item), item, index]
        rescue StandardError => e
          Ractor.yield [e, nil, item, index]
        end
      end
    end
  end

  # start
  ractors.dup.each do |ractor|
    if (set = job_factory.next)
      item, index = set
      instrument_start item, index, options
      ractor.send [callback, item, index]
    else
      ractor.send([[nil, nil], nil, :break]) # stop the ractor
      ractors.delete ractor
    end
  end

  # replace with new items
  while (set = job_factory.next)
    item_next, index_next = set
    done, (exception, result, item, index) = Ractor.select(*ractors)
    if exception
      ractors.delete done
      break
    end
    instrument_finish item, index, result, options
    results_mutex.synchronize { results[index] = (options[:preserve_results] == false ? nil : result) }

    instrument_start item_next, index_next, options
    done.send([callback, item_next, index_next])
  end

  # finish
  ractors.each do |ractor|
    (new_exception, result, item, index) = ractor.take
    exception ||= new_exception
    next if new_exception
    instrument_finish item, index, result, options
    results_mutex.synchronize { results[index] = (options[:preserve_results] == false ? nil : result) }
    ractor.send([[nil, nil], nil, :break]) # stop the ractor
  end

  exception || results
end
work_in_threads(job_factory, options, &block) click to toggle source
# File lib/parallel.rb, line 428
def work_in_threads(job_factory, options, &block)
  raise "interrupt_signal is no longer supported for threads" if options[:interrupt_signal]
  results = []
  results_mutex = Mutex.new # arrays are not thread-safe on jRuby
  exception = nil

  in_threads(options) do |worker_num|
    self.worker_number = worker_num
    # as long as there are more jobs, work on one of them
    while !exception && (set = job_factory.next)
      begin
        item, index = set
        result = with_instrumentation item, index, options do
          call_with_index(item, index, options, &block)
        end
        results_mutex.synchronize { results[index] = result }
      rescue StandardError
        exception = $!
      end
    end
  end

  exception || results
end
worker(job_factory, options, &block) click to toggle source
# File lib/parallel.rb, line 587
def worker(job_factory, options, &block)
  child_read, parent_write = IO.pipe
  parent_read, child_write = IO.pipe

  pid = Process.fork do
    self.worker_number = options[:worker_number]

    begin
      options.delete(:started_workers).each(&:close_pipes)

      parent_write.close
      parent_read.close

      process_incoming_jobs(child_read, child_write, job_factory, options, &block)
    ensure
      child_read.close
      child_write.close
    end
  end

  child_read.close
  child_write.close

  Worker.new(parent_read, parent_write, pid)
end