class Utilrb::ThreadPool

ThreadPool implementation inspired by github.com/meh/ruby-threadpool

@example Using a thread pool of 10 threads

pool = ThreadPool.new(10)
0.upto(9) do 
   pool.process do 
     sleep 1
     puts "done"
   end
end
pool.shutdown
pool.join

@author Alexander Duda <Alexander.Duda@dfki.de>

Attributes

auto_trim[RW]

Auto trim automatically reduces the number of worker threads if there are too many threads waiting for work. @return [Boolean]

avg_run_time[R]

The average execution time of a (running) task.

@return [Float]

avg_wait_time[R]

The average waiting time of a task before being executed.

@return [Float]

max[R]

The maximum number of worker threads.

@return [Fixnum]

min[R]

The minimum number of worker threads.

@return [Fixnum]

spawned[R]

The real number of worker threads.

@return [Fixnum]

waiting[R]

The number of worker threads waiting for work.

@return [Fixnum]

Public Class Methods

new(min = 5, max = min) click to toggle source

A ThreadPool

@param [Fixnum] min the minimum number of threads @param [Fixnum] max the maximum number of threads

# File lib/utilrb/thread_pool.rb, line 293
def initialize (min = 5, max = min)
    @min = min
    @max = max

    @cond = ConditionVariable.new
    @cond_sync_key = ConditionVariable.new
    @mutex = Mutex.new

    @tasks_waiting = []         # tasks waiting for execution
    @tasks_running = []         # tasks which are currently running
    
    # Statistics
    @avg_run_time = 0           # average run time of a task in s [Float]
    @avg_wait_time = 0          # average time a task has to wait for execution in s [Float]

    @workers = []               # thread pool
    @spawned = 0
    @waiting = 0
    @shutdown = false
    @callback_on_task_finished = nil
    @pipes = nil
    @sync_keys = Set.new

    @trim_requests = 0
    @auto_trim = false

    @mutex.synchronize do
        min.times do
            spawn_thread
        end
    end
end

Private Class Methods

report_exception(msg, e) click to toggle source
# File lib/utilrb/thread_pool.rb, line 619
def self.report_exception(msg, e)
    if msg
        STDERR.puts msg
    end
    STDERR.puts e.message
    STDERR.puts "  #{e.backtrace.join("\n  ")}"
end

Public Instance Methods

<<(task) click to toggle source

Processes the given {Task} as soon as the next thread is available

@param [Task] task The task. @return [Task]

# File lib/utilrb/thread_pool.rb, line 486
def <<(task)
    raise "cannot add task #{task} it is still running" if task.thread
    task.reset if task.finished?
    @mutex.synchronize do
        if shutdown? 
            raise "unable to add work while shutting down"
        end
        task.queued_at = Time.now
        @tasks_waiting << task
        if @waiting <= @tasks_waiting.size && @spawned < @max
            spawn_thread
        end
        @cond.signal
    end
    task
end
backlog() click to toggle source

Number of tasks waiting for execution

@return [Fixnum] the number of tasks

# File lib/utilrb/thread_pool.rb, line 375
def backlog
    @mutex.synchronize do 
        @tasks_waiting.length
    end
end
clear() click to toggle source
# File lib/utilrb/thread_pool.rb, line 343
def clear
    shutdown
    join
rescue Exception
ensure
    @shutdown = false
end
join() click to toggle source

Blocks until all threads were terminated. This does not terminate any thread by itself and will block for ever if shutdown was not called.

# File lib/utilrb/thread_pool.rb, line 529
def join
    while true
        if w = @mutex.synchronize { @workers.first }
            w.join
        else
            break
        end
    end
    self
end
max=(val) click to toggle source

sets the maximum number of threads

# File lib/utilrb/thread_pool.rb, line 332
def max=(val)
    resize(min,val)
end
min=(val) click to toggle source

sets the minimum number of threads

# File lib/utilrb/thread_pool.rb, line 327
def min=(val)
    resize(val,max)
end
on_task_finished(&block) click to toggle source

Given code block is called for every task which was finished even it was terminated.

This can be used to store the result for an event loop

@yield [Task] the code block

# File lib/utilrb/thread_pool.rb, line 546
def on_task_finished (&block)
    @mutex.synchronize do
        @callback_on_task_finished = block
    end
end
process(*args, &block) click to toggle source

Processes the given block as soon as the next thread is available.

@param [Array] args the block arguments @yield [*args] the block @return [Task]

# File lib/utilrb/thread_pool.rb, line 395
def process (*args, &block)
    process_with_options(nil,*args,&block)
end
process?() click to toggle source

Returns true if a worker thread is currently processing a task and no work is queued

@return [Boolean]

# File lib/utilrb/thread_pool.rb, line 403
def process?
    @mutex.synchronize do
         waiting != spawned || @tasks_waiting.length > 0
    end
end
process_with_options(options,*args, &block) click to toggle source

Processes the given block as soon as the next thread is available with the given options.

@param (see Task#initialize) @option (see Task#initialize) @return [Task]

# File lib/utilrb/thread_pool.rb, line 415
def process_with_options(options,*args, &block)
    task = Task.new(options,*args, &block)
    self << task
    task
end
resize(min, max = nil) click to toggle source

Changes the minimum and maximum number of threads

@param [Fixnum] min the minimum number of threads @param [Fixnum] max the maximum number of threads

# File lib/utilrb/thread_pool.rb, line 360
def resize (min, max = nil)
    @mutex.synchronize do
        @min = min
        @max = max || min
        count = [@tasks_waiting.size,@max - @spawned].min
        count.times do
            spawn_thread
        end
    end
    trim true
end
shutdown() click to toggle source

Shuts down all threads.

# File lib/utilrb/thread_pool.rb, line 517
def shutdown
    tasks = nil
    @mutex.synchronize do
        @shutdown = true
        @cond.broadcast
    end
end
shutdown?() click to toggle source

Checks if the thread pool is shutting down all threads.

@return [boolean]

# File lib/utilrb/thread_pool.rb, line 354
def shutdown?; @shutdown; end
sync(sync_key,*args,&block) click to toggle source

Processes the given block from current thread but insures that during processing no worker thread is executing a task which has the same sync_key.

This is useful for instance member calls which are not thread safe.

@param [Object] sync_key The sync key @yield [*args] the code block block @return [Object] The result of the code block

# File lib/utilrb/thread_pool.rb, line 431
def sync(sync_key,*args,&block)
    raise ArgumentError,"no sync key" unless sync_key

    @mutex.synchronize do
        while(!@sync_keys.add?(sync_key))
            @cond_sync_key.wait @mutex #wait until someone has removed a key
        end
    end
    begin
        result = block.call(*args)
    ensure
        @mutex.synchronize do
            @sync_keys.delete sync_key
        end
        @cond_sync_key.signal
        @cond.signal # worker threads are just waiting for work no matter if it is
        # because of a deletion of a sync_key or a task was added
    end
    result
end
sync_keys() click to toggle source

returns the current used sync_keys

# File lib/utilrb/thread_pool.rb, line 337
def sync_keys
    @mutex.synchronize do
        @sync_keys.clone
    end
end
sync_timeout(sync_key,timeout,*args,&block) click to toggle source

Same as sync but raises Timeout::Error if sync_key cannot be obtained after the given execution time.

@param [Object] sync_key The sync key @param [Float] timeout The timeout @yield [*args] the code block block @return [Object] The result of the code block

# File lib/utilrb/thread_pool.rb, line 459
def sync_timeout(sync_key,timeout,*args,&block)
    raise ArgumentError,"no sync key" unless sync_key

    Timeout::timeout(timeout) do
        @mutex.synchronize do
            while(!@sync_keys.add?(sync_key))
                @cond_sync_key.wait @mutex #wait until someone has removed a key
            end
        end
    end
    begin
        result = block.call(*args)
    ensure
        @mutex.synchronize do
            @sync_keys.delete sync_key
        end
        @cond_sync_key.signal
        @cond.signal # worker threads are just waiting for work no matter if it is
        # because of a deletion of a sync_key or a task was added
    end
    result
end
tasks() click to toggle source

Returns an array of the current waiting and running tasks

@return [Array<Task>] The tasks

# File lib/utilrb/thread_pool.rb, line 384
def tasks
    @mutex.synchronize do
         @tasks_running.dup + @tasks_waiting.dup
    end
end
trim(force = false) click to toggle source

Trims the number of threads if threads are waiting for work and the number of spawned threads is higher than the minimum number.

@param [boolean] force Trim even if no thread is waiting.

# File lib/utilrb/thread_pool.rb, line 507
def trim (force = false)
    @mutex.synchronize do
        @trim_requests += 1
        @cond.signal
    end
    self
end

Private Instance Methods

moving_average(current_val,new_val) click to toggle source

calculates the moving average

# File lib/utilrb/thread_pool.rb, line 555
def moving_average(current_val,new_val)
    return new_val if current_val == 0
    current_val * 0.95 + new_val * 0.05
end
spawn_thread() click to toggle source

spawns a worker thread must be called from a synchronized block

# File lib/utilrb/thread_pool.rb, line 562
def spawn_thread
    thread = Thread.new do
        while !shutdown? do
            current_task = @mutex.synchronize do
                while !shutdown?
                    task = @tasks_waiting.each_with_index do |t,i|
                        if !t.sync_key || @sync_keys.add?(t.sync_key)
                            @tasks_waiting.delete_at(i)
                            t.pre_execute(self) # block tasks so that no one is using it at the same time
                            @tasks_running << t
                            @avg_wait_time = moving_average(@avg_wait_time,(Time.now-t.queued_at))
                            break t
                        end
                    end
                    break task unless task.is_a? Array

                    if @spawned > @min && (auto_trim || @trim_requests > 0)
                        if @trim_requests > 0
                            @trim_requests -= 1
                        end
                        break
                    end
                    @waiting += 1
                    @cond.wait @mutex
                    @waiting -= 1
                end or break
            end or break
            begin
                current_task.execute
            rescue Exception => e
                ThreadPool.report_exception(nil, e)
            ensure
                @mutex.synchronize do
                    @tasks_running.delete current_task
                    if current_task.sync_key
                        @sync_keys.delete(current_task.sync_key)
                        @cond_sync_key.signal
                        @cond.signal # maybe another thread is waiting for a sync key
                    end
                    @avg_run_time = moving_average(@avg_run_time,(current_task.stopped_at-current_task.started_at))
                end
                current_task.finalize # propagate state after it was deleted from the internal lists
                @callback_on_task_finished.call(current_task) if @callback_on_task_finished
            end
        end

        @mutex.synchronize do
            @spawned -= 1
            @workers.delete thread
        end
    end
    @spawned += 1
    @workers << thread
rescue Exception => e
    ThreadPool.report_exception(nil, e)
end