module Parallel
Constants
- Stop
- VERSION
Public Class Methods
all?(*args, &block)
click to toggle source
# File lib/parallel.rb, line 243 def all?(*args, &block) raise "You must provide a block when calling #all?" if block.nil? !!each(*args) { |*a| raise Kill unless block.call(*a) } end
any?(*args, &block)
click to toggle source
# File lib/parallel.rb, line 238 def any?(*args, &block) raise "You must provide a block when calling #any?" if block.nil? !each(*args) { |*a| raise Kill if block.call(*a) } end
each(array, options = {}, &block)
click to toggle source
# File lib/parallel.rb, line 234 def each(array, options = {}, &block) map(array, options.merge(preserve_results: false), &block) end
each_with_index(array, options = {}, &block)
click to toggle source
# File lib/parallel.rb, line 248 def each_with_index(array, options = {}, &block) each(array, options.merge(with_index: true), &block) end
filter_map(...)
click to toggle source
# File lib/parallel.rb, line 307 def filter_map(...) map(...).compact end
flat_map(...)
click to toggle source
# File lib/parallel.rb, line 303 def flat_map(...) map(...).flatten(1) end
in_processes(options = {}, &block)
click to toggle source
# File lib/parallel.rb, line 228 def in_processes(options = {}, &block) count, options = extract_count_from_options(options) count ||= processor_count map(0...count, options.merge(in_processes: count), &block) end
in_threads(options = { count: 2 }) { |i| ... }
click to toggle source
# File lib/parallel.rb, line 212 def in_threads(options = { count: 2 }) threads = [] count, = extract_count_from_options(options) Thread.handle_interrupt(Exception => :never) do Thread.handle_interrupt(Exception => :immediate) do count.times do |i| threads << Thread.new { yield(i) } end threads.map(&:value) end ensure threads.each(&:kill) end end
map(source, options = {}, &block)
click to toggle source
# File lib/parallel.rb, line 252 def map(source, options = {}, &block) options = options.dup options[:mutex] = Mutex.new if options[:in_processes] && options[:in_threads] raise ArgumentError, "Please specify only one of `in_processes` or `in_threads`." elsif RUBY_PLATFORM =~ (/java/) && !(options[:in_processes]) method = :in_threads size = options[method] || processor_count elsif options[:in_threads] method = :in_threads size = options[method] elsif options[:in_ractors] method = :in_ractors size = options[method] else method = :in_processes if Process.respond_to?(:fork) size = options[method] || processor_count else warn "Process.fork is not supported by this Ruby" size = 0 end end job_factory = JobFactory.new(source, options[:mutex]) size = [job_factory.size, size].min options[:return_results] = (options[:preserve_results] != false || !!options[:finish]) add_progress_bar!(job_factory, options) result = if size == 0 work_direct(job_factory, options, &block) elsif method == :in_threads work_in_threads(job_factory, options.merge(count: size), &block) elsif method == :in_ractors work_in_ractors(job_factory, options.merge(count: size), &block) else work_in_processes(job_factory, options.merge(count: size), &block) end return result.value if result.is_a?(Break) raise result if result.is_a?(Exception) options[:return_results] ? result : source end
map_with_index(array, options = {}, &block)
click to toggle source
# File lib/parallel.rb, line 299 def map_with_index(array, options = {}, &block) map(array, options.merge(with_index: true), &block) end
physical_processor_count()
click to toggle source
Number of physical processor cores on the current system.
# File lib/parallel.rb, line 312 def physical_processor_count @physical_processor_count ||= begin ppc = case RbConfig::CONFIG["target_os"] when /darwin[12]/ IO.popen("/usr/sbin/sysctl -n hw.physicalcpu").read.to_i when /linux/ cores = {} # unique physical ID / core ID combinations phy = 0 File.read("/proc/cpuinfo").scan(/^physical id.*|^core id.*/) do |ln| if ln.start_with?("physical") phy = ln[/\d+/] elsif ln.start_with?("core") cid = "#{phy}:#{ln[/\d+/]}" cores[cid] = true unless cores[cid] end end cores.count when /mswin|mingw/ physical_processor_count_windows else processor_count end # fall back to logical count if physical info is invalid ppc > 0 ? ppc : processor_count end end
processor_count()
click to toggle source
Number of processors seen by the OS or value considering CPU quota if the process is inside a cgroup, used for process scheduling
# File lib/parallel.rb, line 342 def processor_count @processor_count ||= Integer(ENV['PARALLEL_PROCESSOR_COUNT'] || available_processor_count) end
worker_number()
click to toggle source
# File lib/parallel.rb, line 346 def worker_number Thread.current[:parallel_worker_number] end
worker_number=(worker_num)
click to toggle source
TODO: this does not work when doing threads in forks, so should remove and yield the number instead if needed
# File lib/parallel.rb, line 351 def worker_number=(worker_num) Thread.current[:parallel_worker_number] = worker_num end
Private Class Methods
add_progress_bar!(job_factory, options)
click to toggle source
# File lib/parallel.rb, line 384 def add_progress_bar!(job_factory, options) if (progress_options = options[:progress]) raise "Progressbar can only be used with array like items" if job_factory.size == Float::INFINITY require 'ruby-progressbar' if progress_options == true progress_options = { title: "Progress" } elsif progress_options.respond_to? :to_str progress_options = { title: progress_options.to_str } end progress_options = { total: job_factory.size, format: '%t |%E | %B | %a' }.merge(progress_options) progress = ProgressBar.create(progress_options) old_finish = options[:finish] options[:finish] = lambda do |item, i, result| old_finish.call(item, i, result) if old_finish progress.increment end end end
available_processor_count()
click to toggle source
# File lib/parallel.rb, line 699 def available_processor_count gem 'concurrent-ruby', '>= 1.3.4' require 'concurrent-ruby' Concurrent.available_processor_count.floor rescue LoadError require 'etc' Etc.nprocessors end
call_with_index(item, index, options, &block)
click to toggle source
# File lib/parallel.rb, line 647 def call_with_index(item, index, options, &block) args = [item] args << index if options[:with_index] results = block.call(*args) if options[:return_results] results else nil # avoid GC overhead of passing large results around end end
create_workers(job_factory, options, &block)
click to toggle source
# File lib/parallel.rb, line 579 def create_workers(job_factory, options, &block) workers = [] Array.new(options[:count]).each_with_index do |_, i| workers << worker(job_factory, options.merge(started_workers: workers, worker_number: i), &block) end workers end
extract_count_from_options(options)
click to toggle source
options is either a Integer or a Hash with :count
# File lib/parallel.rb, line 637 def extract_count_from_options(options) if options.is_a?(Hash) count = options[:count] else count = options options = {} end [count, options] end
instrument_finish(item, index, result, options)
click to toggle source
# File lib/parallel.rb, line 665 def instrument_finish(item, index, result, options) return unless (on_finish = options[:finish]) return instrument_finish_in_order(item, index, result, options) if options[:finish_in_order] options[:mutex].synchronize { on_finish.call(item, index, result) } end
instrument_finish_in_order(item, index, result, options)
click to toggle source
yield results in the order of the input items needs to use `options` to store state between executions needs to use `done` index since a nil result would also be valid
# File lib/parallel.rb, line 674 def instrument_finish_in_order(item, index, result, options) options[:mutex].synchronize do # initialize our state options[:finish_done] ||= [] options[:finish_expecting] ||= 0 # we wait for item at index 0 # store current result options[:finish_done][index] = [item, result] # yield all results that are now in order break unless index == options[:finish_expecting] index.upto(options[:finish_done].size).each do |i| break unless (done = options[:finish_done][i]) options[:finish_done][i] = nil # allow GC to free this item and result options[:finish].call(done[0], i, done[1]) options[:finish_expecting] += 1 end end end
instrument_start(item, index, options)
click to toggle source
# File lib/parallel.rb, line 694 def instrument_start(item, index, options) return unless (on_start = options[:start]) options[:mutex].synchronize { on_start.call(item, index) } end
physical_processor_count_windows()
click to toggle source
# File lib/parallel.rb, line 357 def physical_processor_count_windows # Get-CimInstance introduced in PowerShell 3 or earlier: https://learn.microsoft.com/en-us/previous-versions/powershell/module/cimcmdlets/get-ciminstance?view=powershell-3.0 result = run( 'powershell -command "Get-CimInstance -ClassName Win32_Processor -Property NumberOfCores ' \ '| Select-Object -Property NumberOfCores"' ) if !result || $?.exitstatus != 0 # fallback to deprecated wmic for older systems result = run("wmic cpu get NumberOfCores") end if !result || $?.exitstatus != 0 # Bail out if both commands returned something unexpected warn "guessing pyhsical processor count" processor_count else # powershell: "\nNumberOfCores\n-------------\n 4\n\n\n" # wmic: "NumberOfCores \n\n4 \n\n\n\n" result.scan(/\d+/).map(&:to_i).reduce(:+) end end
process_incoming_jobs(read, write, job_factory, options, &block)
click to toggle source
# File lib/parallel.rb, line 613 def process_incoming_jobs(read, write, job_factory, options, &block) until read.eof? data = Marshal.load(read) item, index = job_factory.unpack(data) result = begin call_with_index(item, index, options, &block) # https://github.com/rspec/rspec-support/blob/673133cdd13b17077b3d88ece8d7380821f8d7dc/lib/rspec/support.rb#L132-L140 rescue NoMemoryError, SignalException, Interrupt, SystemExit # rubocop:disable Lint/ShadowedException raise $! rescue Exception # # rubocop:disable Lint/RescueException ExceptionWrapper.new($!) end begin Marshal.dump(result, write) rescue Errno::EPIPE return # parent thread already dead end end end
replace_worker(job_factory, workers, index, options, blk)
click to toggle source
# File lib/parallel.rb, line 567 def replace_worker(job_factory, workers, index, options, blk) options[:mutex].synchronize do # old worker is no longer used ... stop it worker = workers[index] worker.stop # create a new replacement worker running = workers - [worker] workers[index] = worker(job_factory, options.merge(started_workers: running, worker_number: index), &blk) end end
run(command)
click to toggle source
# File lib/parallel.rb, line 378 def run(command) IO.popen(command, &:read) rescue Errno::ENOENT # Ignore end
with_instrumentation(item, index, options) { || ... }
click to toggle source
# File lib/parallel.rb, line 658 def with_instrumentation(item, index, options) instrument_start(item, index, options) result = yield instrument_finish(item, index, result, options) result unless options[:preserve_results] == false end
work_direct(job_factory, options, &block)
click to toggle source
# File lib/parallel.rb, line 409 def work_direct(job_factory, options, &block) self.worker_number = 0 results = [] exception = nil begin while (set = job_factory.next) item, index = set results << with_instrumentation(item, index, options) do call_with_index(item, index, options, &block) end end rescue StandardError exception = $! end exception || results ensure self.worker_number = nil end
work_in_processes(job_factory, options, &blk)
click to toggle source
# File lib/parallel.rb, line 519 def work_in_processes(job_factory, options, &blk) workers = create_workers(job_factory, options, &blk) results = [] results_mutex = Mutex.new # arrays are not thread-safe exception = nil UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do in_threads(options) do |i| worker = workers[i] worker.thread = Thread.current worked = false begin loop do break if exception item, index = job_factory.next break unless index if options[:isolation] worker = replace_worker(job_factory, workers, i, options, blk) if worked worked = true worker.thread = Thread.current end begin result = with_instrumentation item, index, options do worker.work(job_factory.pack(item, index)) end results_mutex.synchronize { results[index] = result } # arrays are not threads safe on jRuby rescue StandardError => e exception = e if exception.is_a?(Kill) (workers - [worker]).each do |w| w.thread&.kill UserInterruptHandler.kill(w.pid) end end end end ensure worker.stop end end end exception || results end
work_in_ractors(job_factory, options)
click to toggle source
# File lib/parallel.rb, line 453 def work_in_ractors(job_factory, options) exception = nil results = [] results_mutex = Mutex.new # arrays are not thread-safe on jRuby callback = options[:ractor] if block_given? || !callback raise ArgumentError, "pass the code you want to execute as `ractor: [ClassName, :method_name]`" end # build ractors = Array.new(options.fetch(:count)) do Ractor.new do loop do got = receive (klass, method_name), item, index = got break if index == :break begin Ractor.yield [nil, klass.send(method_name, item), item, index] rescue StandardError => e Ractor.yield [e, nil, item, index] end end end end # start ractors.dup.each do |ractor| if (set = job_factory.next) item, index = set instrument_start item, index, options ractor.send [callback, item, index] else ractor.send([[nil, nil], nil, :break]) # stop the ractor ractors.delete ractor end end # replace with new items while (set = job_factory.next) item_next, index_next = set done, (exception, result, item, index) = Ractor.select(*ractors) if exception ractors.delete done break end instrument_finish item, index, result, options results_mutex.synchronize { results[index] = (options[:preserve_results] == false ? nil : result) } instrument_start item_next, index_next, options done.send([callback, item_next, index_next]) end # finish ractors.each do |ractor| (new_exception, result, item, index) = ractor.take exception ||= new_exception next if new_exception instrument_finish item, index, result, options results_mutex.synchronize { results[index] = (options[:preserve_results] == false ? nil : result) } ractor.send([[nil, nil], nil, :break]) # stop the ractor end exception || results end
work_in_threads(job_factory, options, &block)
click to toggle source
# File lib/parallel.rb, line 428 def work_in_threads(job_factory, options, &block) raise "interrupt_signal is no longer supported for threads" if options[:interrupt_signal] results = [] results_mutex = Mutex.new # arrays are not thread-safe on jRuby exception = nil in_threads(options) do |worker_num| self.worker_number = worker_num # as long as there are more jobs, work on one of them while !exception && (set = job_factory.next) begin item, index = set result = with_instrumentation item, index, options do call_with_index(item, index, options, &block) end results_mutex.synchronize { results[index] = result } rescue StandardError exception = $! end end end exception || results end
worker(job_factory, options, &block)
click to toggle source
# File lib/parallel.rb, line 587 def worker(job_factory, options, &block) child_read, parent_write = IO.pipe parent_read, child_write = IO.pipe pid = Process.fork do self.worker_number = options[:worker_number] begin options.delete(:started_workers).each(&:close_pipes) parent_write.close parent_read.close process_incoming_jobs(child_read, child_write, job_factory, options, &block) ensure child_read.close child_write.close end end child_read.close child_write.close Worker.new(parent_read, parent_write, pid) end