class Airbrake::ThreadPool
ThreadPool
implements a simple thread pool that can configure the number of worker threads and the size of the queue to process.
@example
# Initialize a new thread pool with 5 workers and a queue size of 100. Set # the block to be run concurrently. thread_pool = ThreadPool.new( name: 'performance-notifier', worker_size: 5, queue_size: 100, block: proc { |message| print "ECHO: #{message}..."} ) # Send work. 10.times { |i| thread_pool << i } #=> ECHO: 0...ECHO: 1...ECHO: 2...
@api private @since v4.6.1
Attributes
@return [ThreadGroup] the list of workers @note This is exposed for eaiser unit testing
Public Class Methods
# File lib/airbrake-ruby/thread_pool.rb, line 28 def initialize(worker_size:, queue_size:, block:, name: nil) @name = name @worker_size = worker_size @queue_size = queue_size @block = block @queue = SizedQueue.new(queue_size) @workers = ThreadGroup.new @mutex = Mutex.new @pid = nil @closed = false has_workers? end
Public Instance Methods
Adds a new message to the thread pool. Rejects messages if the queue is at its capacity.
@param [Object] message The message that gets passed to the block @return [Boolean] true if the message was successfully sent to the pool,
false if the queue is full
# File lib/airbrake-ruby/thread_pool.rb, line 49 def <<(message) if backlog >= @queue_size logger.info do "#{LOG_LABEL} ThreadPool has reached its capacity of " \ "#{@queue_size} and the following message will not be " \ "processed: #{message.inspect}" end return false end @queue << message true end
@return [Integer] how big the queue is at the moment
# File lib/airbrake-ruby/thread_pool.rb, line 64 def backlog @queue.size end
Closes the thread pool making it a no-op (it shut downs all worker threads). Before closing, waits on all unprocessed tasks to be processed.
@return [void] @raise [Airbrake::Error] when invoked more than one time
# File lib/airbrake-ruby/thread_pool.rb, line 101 def close threads = @mutex.synchronize do raise Airbrake::Error, 'this thread pool is closed already' if @closed unless @queue.empty? msg = "#{LOG_LABEL} waiting to process #{@queue.size} task(s)..." logger.debug("#{msg} (Ctrl-C to abort)") end @worker_size.times { @queue << :stop } @closed = true @workers.list.dup end threads.each(&:join) logger.debug("#{LOG_LABEL} #{@name} thread pool closed") end
# File lib/airbrake-ruby/thread_pool.rb, line 119 def closed? @closed end
Checks if a thread pool has any workers. A thread pool doesn’t have any workers only in two cases: when it was closed or when all workers crashed. An active thread pool doesn’t have any workers only when something went wrong.
Workers are expected to crash when you fork
the process the workers are living in. In this case we detect a fork
and try to revive them here.
Another possible scenario that crashes workers is when you close the instance on at_exit
, but some other at_exit
hook prevents the process from exiting.
@return [Boolean] true if an instance wasn’t closed, but has no workers @see goo.gl/oydz8h Example of at_exit that prevents exit
# File lib/airbrake-ruby/thread_pool.rb, line 82 def has_workers? @mutex.synchronize do return false if @closed if @pid != Process.pid && @workers.list.empty? @pid = Process.pid @workers = ThreadGroup.new spawn_workers end !@closed && @workers.list.any? end end
# File lib/airbrake-ruby/thread_pool.rb, line 123 def spawn_workers @worker_size.times { @workers.add(spawn_worker) } end
Private Instance Methods
# File lib/airbrake-ruby/thread_pool.rb, line 129 def spawn_worker Thread.new do while (message = @queue.pop) break if message == :stop @block.call(message) end end end