class OFlow::Task

The Task class provides the asynchronous functionality for the system. Each Task has it's own thread and receives requests as an operation and Box of data by the receive() method. The request is put on a queue and popped off one at a time and handed to an Actor associatesd with the Task.

Constants

BLOCKED

value of @state that indicates the Task is not receiving new requests.

CLOSING

value of @state that indicates the Task is shutting down

RUNNING

value of @state that indicates the Task is currently ready to process requests

STARTING

value of @state that indicates the Task is being created.

STEP

value of @state that indicates the Task is processing one request and will stop after that processing is complete

STOPPED

value of @state that indicates the Task is not currently processing requests

Attributes

actor[R]

the Actor

bounds[RW]
color[RW]
name[R]

The name.

shape[RW]
state[R]

The current processing state of the Task

Public Class Methods

new(flow, name, actor_class, options={}) click to toggle source

A Task is initialized by specifying a class to create an instance of. @param flow [Flow] Flow containing the Task @param name [name] Task base name @param actor_class [Class] _class Class of the Actor to create @param options [Hash] additional options for the Task or Actor

# File lib/oflow/task.rb, line 44
def initialize(flow, name, actor_class, options={})
  @flow = flow
  @name = name.to_sym
  @state = STARTING
  @bounds = nil # [x, y, w, h] as fixnums
  @color = nil  # [r, g, b] as floats
  @shape = nil  # Rectangle is the default
  @queue = []
  @req_mutex = Mutex.new()
  @req_thread = nil
  @step_thread = nil
  @waiting_thread = nil
  @req_timeout = 0.0
  @max_queue_count = nil
  @current_req = nil
  @proc_cnt = 0
  @loop = nil
  @links = {}
  @log = nil
  @error_handler = nil

  set_options(options)

  info("Creating actor #{actor_class} with options #{options}.")
  @actor = actor_class.new(self, options)
  raise Exception.new("#{actor} does not respond to the perform() method.") unless @actor.respond_to?(:perform)

  @state = options.fetch(:state, RUNNING)

  return unless @actor.with_own_thread()

  @loop = Thread.start(self) do |me|
    Thread.current[:name] = me.full_name()
    while CLOSING != @state
      begin
        if RUNNING == @state || STEP == @state || BLOCKED == @state
          req = nil
          if @queue.empty?
            @waiting_thread.wakeup() unless @waiting_thread.nil?
            sleep(1.0)
            next
          end
          @req_mutex.synchronize {
            req = @queue.pop()
          }
          @req_thread.wakeup() unless @req_thread.nil?

          @current_req = req
          begin
            if debug?
              debug("perform(#{req.op}, #{req.box.nil? ? '<nil>' : req.box})")
            else
              info("perform(#{req.op})")
            end
            @actor.perform(req.op, req.box) unless req.nil?
          rescue Exception => e
            handle_error(e)
          end
          @proc_cnt += 1
          @current_req = nil
          if STEP == @state
            @step_thread.wakeup() unless @step_thread.nil?
            @state = STOPPED
          end
        elsif STOPPED == @state
          sleep(1.0)
        end
      rescue Exception => e
        puts "*** #{full_name} #{e.class}: #{e.message}"
        @current_req = nil
        # TBD Env.rescue(e)
      end
    end
  end
end

Public Instance Methods

_validation_errors() click to toggle source
# File lib/oflow/task.rb, line 421
def _validation_errors()
  errors = []
  @links.each_value { |lnk| _check_link(lnk, errors) }

  unless (outs = @actor.outputs()).nil?
    outs.each do |spec|
      if find_link(spec.dest).nil?
        errors << ValidateError::Problem.new(full_name, ValidateError::Problem::MISSING_ERROR, "Missing link for '#{spec.dest}'.")
      end
    end
  end
  errors
end
backed_up() click to toggle source

Returns a score indicating how backed up the queue is. This is used for selecting an Actor when stepping from the Inspector. @return [Fixnum] a measure of how backed up a Task is

# File lib/oflow/task.rb, line 230
def backed_up()
  cnt = @queue.size() + (@current_req.nil? ? 0 : 1)
  return 0 if 0 == cnt
  if @max_queue_count.nil? || 0 == @max_queue_count
    cnt = 80 if 80 < cnt
    cnt
  else
    cnt * 100 / @max_queue_count
  end
end
busy?() click to toggle source

Returns the true if any requests are queued or a request is being processed. @return [true|false] true if busy, false otherwise

# File lib/oflow/task.rb, line 243
def busy?()
  !@current_req.nil? || !@queue.empty? || @actor.busy?
end
describe(detail=0, indent=0) click to toggle source

Returns a String that describes the Task. @param detail [Fixnum] higher values result in more detail in the description @param indent [Fixnum] the number of spaces to indent the description

# File lib/oflow/task.rb, line 186
def describe(detail=0, indent=0)
  i = ' ' * indent
  lines = ["#{i}#{name} (#{actor.class}) {"]
  @links.each { |local,link|
    if link.flow_name.nil?
      lines << "  #{i}#{local} => :#{link.target_name}:#{link.op}"
    else
      lines << "  #{i}#{local} => #{link.flow_name}:#{link.target_name}:#{link.op}"
    end
  }
  if 1 <= detail
    lines << "  #{i}queued: #{queue_count()} (#{busy? ? 'busy' : 'idle'})"
    lines << "  #{i}state: #{state_string()}"
    @actor.options.each do |key,value|
      lines << "  #{i}#{key} = #{value} (#{value.class})"
    end
    if 2 <= detail
      lines << "  #{i}processing: #{@current_req.describe(detail)}" unless @current_req.nil?
      # Copy queue so it doesn't change while working with it and don't
      # block the queue. It is okay for the requests to be stale as it is
      # for a display that will be out of date as soon as it is displayed.
      reqs = []
      @req_mutex.synchronize {
        reqs = Array.new(@queue)
      }
      lines << "  #{i}queue:"
      reqs.each do |req|
        lines << "    #{i}#{req.describe(detail)}"
      end
    end
  end
  lines << i + "}"
  lines.join("\n")
end
error_handler() click to toggle source

Returns an error handler Task by looking for that Task as an attribute and then in the parent Flow. @return [Task] error handler Task.

# File lib/oflow/task.rb, line 157
def error_handler()
  return @error_handler unless @error_handler.nil?
  @flow.error_handler()
end
flush() click to toggle source

Waits for all processing to complete before returning.

# File lib/oflow/task.rb, line 318
def flush()
  return if @loop.nil?
  @waiting_thread = Thread.current
  begin
    @loop.wakeup()
  rescue
    # ignore
  end
  while busy?
    sleep(2.0)
  end
  @waiting_thread = nil
end
full_name() click to toggle source

Similar to a full file path. The full_name described the containment of the named item. @return [String] full name of item

# File lib/oflow/task.rb, line 138
def full_name()
  if @flow.nil?
    ':' + @name.to_s
  else
    @flow.full_name() + ':' + @name.to_s
  end
end
has_input(op) click to toggle source
# File lib/oflow/task.rb, line 435
def has_input(op)
  ins = @actor.inputs()
  return true if ins.nil?
  op = op.to_sym unless op.nil?
  ins.each { |spec| return true if spec.op.nil? || spec.op == op }
  false
end
inputs() click to toggle source

The expected inputs the Task supports or nil if not implemented.

# File lib/oflow/task.rb, line 338
def inputs()
  @actor.inputs()
end
log() click to toggle source

Returns a log Task by looking for that Task as an attribute and then in the parent Flow. @return [Task] log Task.

# File lib/oflow/task.rb, line 149
def log()
  return @log unless @log.nil?
  @flow.log()
end
max_queue_count() click to toggle source

Returns the maximum number of requests allowed on the normal processing queue. A value of nil indicates there is no limit. @return [NilClass|Fixnum] maximum number of request that can be queued

# File lib/oflow/task.rb, line 257
def max_queue_count()
  @max_queue_count
end
outputs() click to toggle source

The expected outputs the Task supports or nil if not implemented.

# File lib/oflow/task.rb, line 343
def outputs()
  @actor.outputs()
end
proc_count() click to toggle source

Returns the total number of requested processed. @return [Fixnum] number of request processed

# File lib/oflow/task.rb, line 263
def proc_count()
  @proc_cnt
end
queue_count() click to toggle source

Returns the number of requests on the queue. @return [Fixnum] number of queued requests

# File lib/oflow/task.rb, line 223
def queue_count()
  @queue.length
end
receive(op, box) click to toggle source

Creates a Request and adds it to the queue. @param op [Symbol] operation to perform @param box [Box] contents or data for the request

# File lib/oflow/task.rb, line 350
def receive(op, box)
  if debug?
    debug("receive(#{op}, #{box.nil? ? '<nil>' : box}) #{state_string}")
  else
    info("receive(#{op})")
  end
  return if CLOSING == @state

  raise BlockedError.new() if BLOCKED == @state

  box = box.receive(full_name, op) unless box.nil?
  # Special case for starting state so that an Actor can place an item on
  # the queue before the loop is started.
  if @loop.nil? && STARTING != @state # no thread task
    begin
      @actor.perform(op, box)
    rescue Exception => e
      handle_error(e)
    end
    return
  end
  unless @max_queue_count.nil? || 0 == @max_queue_count || @queue.size() < @max_queue_count
    @req_thread = Thread.current
    sleep(timeout) unless @req_timeout.nil? || 0 == @req_timeout
    @req_thread = nil
    raise BusyError.new() unless @queue.size() < @max_queue_count
  end
  @req_mutex.synchronize {
    @queue.insert(0, Request.new(op, box))
  }
  @loop.wakeup() if RUNNING == @state
end
request_timeout() click to toggle source

Returns the default timeout for the time to wait for the Task to be ready to accept a request using the receive() method. @return [Float] current timeout for the receive() method

# File lib/oflow/task.rb, line 250
def request_timeout()
  @req_timeout
end
set_option(key, value) click to toggle source
# File lib/oflow/task.rb, line 410
def set_option(key, value)
  case key.to_sym()
  when :max_queue_count
    @max_queue_count = value.to_i
  when :request_timeout
    @req_timeout = value.to_f
  else
    @actor.set_option(key, value)
  end
end
set_options(options) click to toggle source

Processes the initialize() options. Subclasses should call super. @param options [Hash] options to be used for initialization @option options [Fixnum] :max_queue_count maximum number of requests

that can be queued before backpressure is applied to the caller.

@option options [Float] :request_timeout timeout in seconds to wait

before raising a BusyError if the request queue is too long.
# File lib/oflow/task.rb, line 405
def set_options(options)
  @max_queue_count = options.fetch(:max_queue_count, @max_queue_count)
  @req_timeout = options.fetch(:request_timeout, @req_timeout).to_f
end
ship(dest, box) click to toggle source

Sends a message to another Task or Flow. @param dest [Symbol] identifies the link that points to the destination Task or Flow @param box [Box] contents or data for the request

# File lib/oflow/task.rb, line 386
def ship(dest, box)
  box.freeze() unless box.nil?
  link = resolve_link(dest)
  raise LinkError.new(dest) if link.nil? || link.target.nil?
  if debug?
    debug("shipping #{box} to #{link.target_name}:#{link.op}")
  else
    info("shipping to #{link.target_name}:#{link.op}")
  end
  link.target.receive(link.op, box)
  link
end
shutdown(flush_first=false) click to toggle source

Closes the Task by exiting the processing thread. If flush is true then all requests in the queue are processed first.

# File lib/oflow/task.rb, line 301
def shutdown(flush_first=false)
  return if @loop.nil?
  if flush_first
    @state = BLOCKED
    flush()
  end
  @state = CLOSING
  begin
    # if the loop has already exited this will raise an Exception that can be ignored
    @loop.wakeup()
  rescue
    # ignore
  end
  @loop.join()
end
start() click to toggle source

Restarts the Task's processing thread.

# File lib/oflow/task.rb, line 294
def start()
  @state = RUNNING
  @loop.wakeup() unless @loop.nil?
end
state=(s) click to toggle source

The current state of the Task.

# File lib/oflow/task.rb, line 333
def state=(s)
  @state = s
end
state_string() click to toggle source

Returns the state of the Task as a String. @return [String] String representation of the state

# File lib/oflow/task.rb, line 164
def state_string()
  ss = 'UNKNOWN'
  case @state
  when STARTING
    ss = 'STARTING'
  when STOPPED
    ss = 'STOPPED'
  when RUNNING
    ss = 'RUNNING'
  when CLOSING
    ss = 'CLOSING'
  when BLOCKED
    ss = 'BLOCKED'
  when STEP
    ss = 'STEP'
  end
  ss
end
step(max_wait=5) click to toggle source

Causes the Task to process one request and then stop. The max_wait is used to avoid getting stuck if the processing takes too long. @param max_wait [Float|Fixnum] _wait maximum time to wait for the step to complete

# File lib/oflow/task.rb, line 276
def step(max_wait=5)
  return nil if @loop.nil?
  return nil if STOPPED != @state || @queue.empty?
  @state = STEP
  @step_thread = Thread.current
  @loop.wakeup()
  sleep(max_wait)
  @step_thread = nil
  self
end
stop() click to toggle source

Causes the Actor to stop processing any more requests after the current request has finished.

# File lib/oflow/task.rb, line 269
def stop()
  @state = STOPPED
end
wakeup() click to toggle source

Wakes up the Task if it has been stopped or if Env.shutdown() has been called.

# File lib/oflow/task.rb, line 288
def wakeup()
  # don't wake if the task is currently processing
  @loop.wakeup() unless @loop.nil?
end