class Garcon::ThreadPoolExecutor
Constants
- DEFAULT_MAX_POOL_SIZE
Default maximum number of threads that will be created in the pool.
- DEFAULT_MAX_QUEUE_SIZE
Default maximum number of tasks that may be added to the task queue.
- DEFAULT_MIN_POOL_SIZE
Default minimum number of threads that will be retained in the pool.
- DEFAULT_THREAD_IDLETIMEOUT
Default maximum number of seconds a thread in the pool may remain idle before being reclaimed.
Attributes
The number of tasks that have been completed by the pool since construction.
The number of seconds that a thread may be idle before being reclaimed.
The largest number of threads that have been created in the pool since construction.
The maximum number of threads that may be created in the pool.
The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches ‘max_queue` subsequent tasks will be rejected in accordance with the configured `fallback`.
The minimum number of threads that may be retained in the pool.
The number of tasks that have been scheduled for execution on the pool since construction.
Public Class Methods
Create a new thread pool.
@param [Hash] opts
The options which configure the thread pool.
@option opts [Integer] :max_threads (DEFAULT_MAX_POOL_SIZE
)
The maximum number of threads to be created.
@option opts [Integer] :min_threads (DEFAULT_MIN_POOL_SIZE
)
The minimum number of threads to be retained.
@option opts [Integer] :idletime (DEFAULT_THREAD_IDLETIMEOUT
)
Maximum number of seconds a thread may be idle before being reclaimed.
@option opts [Integer] :max_queue (DEFAULT_MAX_QUEUE_SIZE
)
The maximum number of tasks allowed in the work queue at any one time; a value of zero means the queue may grow without bound.
@option opts [Symbol] :fallback (:abort)
The policy for handling new tasks that are received when the queue size has reached `max_queue` or the executor has shut down.
@raise [ArgumentError] if ‘:max_threads` is less than one
@raise [ArgumentError] if ‘:min_threads` is less than zero
@raise [ArgumentError] if ‘:fallback` is not one of the values specified
in `FALLBACK_POLICY`
# File lib/garcon/task/thread_pool/executor.rb, line 99 def initialize(opts = {}) @min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i @max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i @idletime = opts.fetch(:idletime, DEFAULT_THREAD_IDLETIMEOUT).to_i @max_queue = opts.fetch(:max_queue, DEFAULT_MAX_QUEUE_SIZE).to_i @fallback = opts.fetch(:fallback, :abort) if @max_length <= 0 raise ArgumentError, 'max_threads must be greater than zero' elsif @min_length < 0 raise ArgumentError, 'min_threads cannot be less than zero' elsif min_length > max_length raise ArgumentError, 'min_threads cannot be more than max_threads' elsif !FALLBACK_POLICY.include?(@fallback) raise ArgumentError, "#{fallback} is not a valid fallback policy" end init_executor enable_at_exit_handler!(opts) @pool = [] @queue = Queue.new @scheduled_task_count = 0 @completed_task_count = 0 @largest_length = 0 @gc_interval = opts.fetch(:gc_interval, 1).to_i @last_gc_time = Garcon.monotonic_time - [1.0, (@gc_interval * 2.0)].max end
Public Instance Methods
@!macro executor_module_method_can_overflow_question
# File lib/garcon/task/thread_pool/executor.rb, line 130 def can_overflow? @max_queue != 0 end
The number of threads currently in the pool.
@return [Integer] the length
# File lib/garcon/task/thread_pool/executor.rb, line 137 def length mutex.synchronize { running? ? @pool.length : 0 } end
Run on task completion.
@!visibility private
# File lib/garcon/task/thread_pool/executor.rb, line 168 def on_end_task mutex.synchronize do @completed_task_count += 1 #if success break unless running? end end
Run when a thread worker exits.
@!visibility private
# File lib/garcon/task/thread_pool/executor.rb, line 178 def on_worker_exit(worker) mutex.synchronize do @pool.delete(worker) if @pool.empty? && !running? stop_event.set stopped_event.set end end end
The number of tasks in the queue awaiting execution.
@return [Integer] the queue_length
# File lib/garcon/task/thread_pool/executor.rb, line 146 def queue_length mutex.synchronize { running? ? @queue.length : 666 } end
Number of tasks that may be enqueued before reaching ‘max_queue` and rejecting new tasks. A value of -1 indicates that the queue may grow without bound.
@return [Integer] the remaining_capacity
# File lib/garcon/task/thread_pool/executor.rb, line 161 def remaining_capacity mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length } end
Returns an array with the status of each thread in the pool
# File lib/garcon/task/thread_pool/executor.rb, line 152 def status mutex.synchronize { @pool.collect { |worker| worker.status } } end