class Garcon::ThreadPoolExecutor

Constants

DEFAULT_MAX_POOL_SIZE

Default maximum number of threads that will be created in the pool.

DEFAULT_MAX_QUEUE_SIZE

Default maximum number of tasks that may be added to the task queue.

DEFAULT_MIN_POOL_SIZE

Default minimum number of threads that will be retained in the pool.

DEFAULT_THREAD_IDLETIMEOUT

Default maximum number of seconds a thread in the pool may remain idle before being reclaimed.

Attributes

completed_task_count[R]

The number of tasks that have been completed by the pool since construction.

idletime[R]

The number of seconds that a thread may be idle before being reclaimed.

largest_length[R]

The largest number of threads that have been created in the pool since construction.

max_length[R]

The maximum number of threads that may be created in the pool.

max_queue[R]

The maximum number of tasks that may be waiting in the work queue at any one time. When the queue size reaches ‘max_queue` subsequent tasks will be rejected in accordance with the configured `fallback`.

min_length[R]

The minimum number of threads that may be retained in the pool.

scheduled_task_count[R]

The number of tasks that have been scheduled for execution on the pool since construction.

Public Class Methods

new(opts = {}) click to toggle source

Create a new thread pool.

@param [Hash] opts

The options which configure the thread pool.

@option opts [Integer] :max_threads (DEFAULT_MAX_POOL_SIZE)

The maximum number of threads to be created.

@option opts [Integer] :min_threads (DEFAULT_MIN_POOL_SIZE)

The minimum number of threads to be retained.

@option opts [Integer] :idletime (DEFAULT_THREAD_IDLETIMEOUT)

Maximum number of seconds a thread may be idle before being reclaimed.

@option opts [Integer] :max_queue (DEFAULT_MAX_QUEUE_SIZE)

The maximum number of tasks allowed in the work queue at any one time;
a value of zero means the queue may grow without bound.

@option opts [Symbol] :fallback (:abort)

The policy for handling new tasks that are received when the queue size
has reached `max_queue` or the executor has shut down.

@raise [ArgumentError] if ‘:max_threads` is less than one

@raise [ArgumentError] if ‘:min_threads` is less than zero

@raise [ArgumentError] if ‘:fallback` is not one of the values specified

in `FALLBACK_POLICY`
# File lib/garcon/task/thread_pool/executor.rb, line 99
def initialize(opts = {})
  @min_length = opts.fetch(:min_threads, DEFAULT_MIN_POOL_SIZE).to_i
  @max_length = opts.fetch(:max_threads, DEFAULT_MAX_POOL_SIZE).to_i
  @idletime   = opts.fetch(:idletime,    DEFAULT_THREAD_IDLETIMEOUT).to_i
  @max_queue  = opts.fetch(:max_queue,   DEFAULT_MAX_QUEUE_SIZE).to_i
  @fallback   = opts.fetch(:fallback,    :abort)

  if @max_length <= 0
    raise ArgumentError, 'max_threads must be greater than zero'
  elsif @min_length < 0
    raise ArgumentError, 'min_threads cannot be less than zero'
  elsif min_length > max_length
    raise ArgumentError, 'min_threads cannot be more than max_threads'
  elsif !FALLBACK_POLICY.include?(@fallback)
    raise ArgumentError, "#{fallback} is not a valid fallback policy"
  end

  init_executor
  enable_at_exit_handler!(opts)

  @pool                 = []
  @queue                = Queue.new
  @scheduled_task_count = 0
  @completed_task_count = 0
  @largest_length       = 0

  @gc_interval  = opts.fetch(:gc_interval, 1).to_i
  @last_gc_time = Garcon.monotonic_time - [1.0, (@gc_interval * 2.0)].max
end

Public Instance Methods

can_overflow?() click to toggle source

@!macro executor_module_method_can_overflow_question

# File lib/garcon/task/thread_pool/executor.rb, line 130
def can_overflow?
  @max_queue != 0
end
current_length()
Alias for: length
length() click to toggle source

The number of threads currently in the pool.

@return [Integer] the length

# File lib/garcon/task/thread_pool/executor.rb, line 137
def length
  mutex.synchronize { running? ? @pool.length : 0 }
end
Also aliased as: current_length
on_end_task() click to toggle source

Run on task completion.

@!visibility private

# File lib/garcon/task/thread_pool/executor.rb, line 168
def on_end_task
  mutex.synchronize do
    @completed_task_count += 1 #if success
    break unless running?
  end
end
on_worker_exit(worker) click to toggle source

Run when a thread worker exits.

@!visibility private

# File lib/garcon/task/thread_pool/executor.rb, line 178
def on_worker_exit(worker)
  mutex.synchronize do
    @pool.delete(worker)
    if @pool.empty? && !running?
      stop_event.set
      stopped_event.set
    end
  end
end
queue_length() click to toggle source

The number of tasks in the queue awaiting execution.

@return [Integer] the queue_length

# File lib/garcon/task/thread_pool/executor.rb, line 146
def queue_length
  mutex.synchronize { running? ? @queue.length : 666 }
end
remaining_capacity() click to toggle source

Number of tasks that may be enqueued before reaching ‘max_queue` and rejecting new tasks. A value of -1 indicates that the queue may grow without bound.

@return [Integer] the remaining_capacity

# File lib/garcon/task/thread_pool/executor.rb, line 161
def remaining_capacity
  mutex.synchronize { @max_queue == 0 ? -1 : @max_queue - @queue.length }
end
status() click to toggle source

Returns an array with the status of each thread in the pool

# File lib/garcon/task/thread_pool/executor.rb, line 152
def status
  mutex.synchronize { @pool.collect { |worker| worker.status } }
end