class SimpleThreadPool

Simple thread pool for executing blocks in parallel in a controlled manner. Threads are not re-used by the pool to prevent any thread local variables from leaking out.

Public Class Methods

new(max_threads) click to toggle source
# File lib/simple_thread_pool.rb, line 11
def initialize(max_threads)
  @max_threads = max_threads
  @lock = Mutex.new
  @threads = []
  @processing_ids = []
end

Public Instance Methods

execute(id = nil, &block) click to toggle source

Call this method to spawn a thread to run the block. If the thread pool is already full, this method will block until a thread is free. The block is responsible for handling any exceptions that could be raised.

The optional id argument can be used to provide an identifier for a block. If one is provided, processing will be blocked if the same id is already being processed. This ensures that each unique id is executed one at a time sequentially.

# File lib/simple_thread_pool.rb, line 26
def execute(id = nil, &block)
  loop do
    # Check if a new thread can be added without blocking.
    while !can_add_thread?(id)
      sleep(0.001)
    end
    
    @lock.synchronize do
      # Check again inside a synchronized block if the thread can still be added.
      if can_add_thread?(id)
        @processing_ids << id unless id.nil?
        add_thread(id, block)
        return
      end
    end
  end
end
finish() click to toggle source

Call this method to block until all current threads have finished executing.

# File lib/simple_thread_pool.rb, line 45
def finish
  active_threads = @lock.synchronize { @threads.select(&:alive?) }
  active_threads.each(&:join)
  nil
end
synchronize(&block) click to toggle source

Synchronize data access across the thread pool. This method will block waiting on the same internal Mutex the thread pool uses to manage scheduling threads.

# File lib/simple_thread_pool.rb, line 54
def synchronize(&block)
  @lock.synchronize(&block)
end

Private Instance Methods

add_thread(id, block) click to toggle source

Spawn a thread in this method to ensure that it doesn't accidentally pick up any local variables.

# File lib/simple_thread_pool.rb, line 65
def add_thread(id, block)
  main_thread = Thread.current
  
  @threads << Thread.new do
    begin
      block.call
      # Return nil to ensure no objects are leaked.
      nil
    ensure
      @lock.synchronize do
        @processing_ids.delete(id) unless id.nil?
        @threads.delete(Thread.current)
      end
      main_thread.wakeup if main_thread.alive?
    end
  end

  nil
end
can_add_thread?(id) click to toggle source
# File lib/simple_thread_pool.rb, line 60
def can_add_thread?(id)
  @threads.size < @max_threads && (id.nil? || !@processing_ids.include?(id))
end