class Utilrb::ThreadPool::Task

A Task is executed by the thread pool as soon as a free thread is available.

@author Alexander Duda <Alexander.Duda@dfki.de>

Constants

Asked

Attributes

description[RW]

Custom description which can be used to store a human readable object

exception[R]

The exception thrown by the custom code block

@return [Exception] the exception

pool[R]

Thread pool the task belongs to

@return [ThreadPool] the thread pool

queued_at[RW]

The time the task was queued

return [Time] the time

result[R]

Result of the code block call

started_at[R]

The time the task was started

return [Time] the time

state[R]

State of the task

@return [:waiting,:running,:stopping,:finished,:terminated,:exception] the state

stopped_at[R]

The time the task was stopped or finished

return [Time] the time

sync_key[R]

The sync key is used to speifiy that a given task must not run in paralles with another task having the same sync key. If no key is set there are no such constrains for the taks.

@return the sync key

thread[R]

The thread the task was assigned to

return [Thread] the thread

Public Class Methods

new(options = Hash.new,*args, &block) click to toggle source

A new task which can be added to the work queue of a {ThreadPool}. If a sync key is given no task having the same key will be executed in parallel which is useful for instance member calls which are not thread safe.

@param [Hash] options The options of the task. @option options [Object] :sync_key The sync key @option options [Proc] :callback The callback @option options [Object] :default Default value returned when an error ocurred which was handled. @param [Array] args The arguments for the code block @param [#call] block The code block

# File lib/utilrb/thread_pool.rb, line 128
def initialize (options = Hash.new,*args, &block)
    unless block
        raise ArgumentError, 'you must pass a work block to initialize a new Task.'
    end
    options = Kernel.validate_options(options,{:sync_key => nil,:default => nil,:callback => nil})
    @sync_key = options[:sync_key]
    @arguments = args
    @default = options[:default]
    @callback = options[:callback]
    @block = block
    @mutex = Mutex.new
    @pool = nil
    @state_temp = nil
    @state = nil
    reset
end

Public Instance Methods

callback(&block) click to toggle source

Called from the worker thread when the work is done

@yield [Object,Exception] The callback

# File lib/utilrb/thread_pool.rb, line 231
def callback(&block)
    @mutex.synchronize do
        @callback = block
    end
end
default?() click to toggle source

returns true if the task has a default return vale @return [Boolean]

# File lib/utilrb/thread_pool.rb, line 164
def default?
     @mutex.synchronize do 
         @default != nil
     end
end
exception?() click to toggle source

Checks if an exception occurred.

@return [Boolean]

# File lib/utilrb/thread_pool.rb, line 115
def exception?; @state == :exception; end
execute() click to toggle source

Executes the task. Should be called from a worker thread after pre_execute was called. After execute returned and the task was deleted from any internal list finalize must be called to propagate the task state.

# File lib/utilrb/thread_pool.rb, line 188
def execute()
    raise RuntimeError, "call pre_execute ThreadPool::Task first. Current state is #{@state} but :running was expected" if @state != :running
    @state_temp = begin
                @result = @block.call(*@arguments)
                :finished
            rescue Exception => e
                @exception = e
                if e.is_a? Asked
                    :terminated
                else
                    :exception
                end
            end
    @stopped_at = Time.now
end
finalize() click to toggle source

propagates the tasks state should be called after execute

# File lib/utilrb/thread_pool.rb, line 206
def finalize
    @mutex.synchronize do
        @thread = nil
        @state = @state_temp
        @pool = nil
    end
    begin
        @callback.call @result,@exception if @callback
    rescue Exception => e
        ThreadPool.report_exception("thread_pool: in #{self}, callback #{@callback} failed", e)
    end
end
finished?() click to toggle source

Checks if the task was stopped or finished. This also includes cases where an exception was raised by the custom code block.

@return [Boolean]

# File lib/utilrb/thread_pool.rb, line 99
def finished?; started? && !running? && !stopping?; end
pre_execute(pool=nil) click to toggle source

sets all internal state to running call execute after that.

# File lib/utilrb/thread_pool.rb, line 172
def pre_execute(pool=nil)
    @mutex.synchronize do 
        #store current thread to be able to terminate
        #the thread
        @pool = pool
        @thread = Thread.current
        @started_at = Time.now
        @state = :running
    end
end
reset() click to toggle source

Resets the tasks. This can be used to requeue a task that is already finished

# File lib/utilrb/thread_pool.rb, line 147
def reset
    if finished? || !started?
        @mutex.synchronize do
            @result = @default
            @state = :waiting
            @exception = nil
            @started_at = nil
            @queued_at = nil
            @stopped_at = nil
        end
    else
        raise RuntimeError,"cannot reset a task which is not finished"
    end
end
running?() click to toggle source

Checks if the task is running

@return [Boolean]

# File lib/utilrb/thread_pool.rb, line 87
def running?; @state == :running; end
started?() click to toggle source

Checks if the task was started

@return [Boolean]

# File lib/utilrb/thread_pool.rb, line 82
def started?; @state != :waiting; end
stopping?() click to toggle source

Checks if the task is going to be stopped

@return [Boolean]

# File lib/utilrb/thread_pool.rb, line 92
def stopping?; @state == :stopping; end
successfull?() click to toggle source

Checks if the task was successfully finished. This means no exceptions, termination or timed out occurred

@return [Boolean]

# File lib/utilrb/thread_pool.rb, line 105
def successfull?; @state == :finished; end
terminate!(exception = Asked) click to toggle source

Terminates the task if it is running

# File lib/utilrb/thread_pool.rb, line 220
def terminate!(exception = Asked)
    @mutex.synchronize do
        return unless running?
        @state = :stopping
        @thread.raise exception
    end
end
terminated?() click to toggle source

Checks if the task was terminated.

@return [Boolean]

# File lib/utilrb/thread_pool.rb, line 110
def terminated?; @state == :terminated; end
time_elapsed(time = Time.now) click to toggle source

Returns the number of seconds the task is or was running at the given point in time

@param [Time] time The point in time. @return

# File lib/utilrb/thread_pool.rb, line 242
def time_elapsed(time = Time.now)
    #no need to synchronize here
    if @stopped_at
        (@stopped_at-@started_at).to_f
    elsif @started_at
        (time-@started_at).to_f
    else
        0
    end
end