class OFlow::Flow

The Class used to managing interactions between Tasks and sub-Flows. It can be thought of as a container for Tasks where the Flow keeps track of the Links between the Tasks.

Attributes

env[R]
name[R]

The name.

Public Class Methods

new(env, name) click to toggle source

Create a new Flow. @param env [Env] Env containing the Flow @param name [name] Flow base name

# File lib/oflow/flow.rb, line 19
def initialize(env, name)
  @name = name.to_sym
  @tasks = {}
  @prepared = false
  @log = nil
  @error_handler = nil
  @env = env
end

Public Instance Methods

_clear() click to toggle source
# File lib/oflow/flow.rb, line 251
def _clear()
end
_locate(path) click to toggle source
# File lib/oflow/flow.rb, line 138
def _locate(path)
  t = @tasks[path[0].to_sym]
  return t if t.nil? || 1 == path.size
  t._locate(path[1..-1])
end
_validation_errors() click to toggle source

Returns an Array of validation errors.

# File lib/oflow/flow.rb, line 84
def _validation_errors()
  errors = []
  @tasks.each_value { |t| errors += t._validation_errors() }
  errors
end
busy?() click to toggle source

Returns true of one or more Tasks is either processing a request or has a request waiting to be processed on it's input queue. @return [true|false] the busy state across all Tasks

# File lib/oflow/flow.rb, line 160
def busy?
  @tasks.each_value { |task| return true if task.busy? }
  false
end
clear() click to toggle source

Clears out all Tasks and Flows and resets the object back to a empty state.

# File lib/oflow/flow.rb, line 232
def clear()
  shutdown()
  @tasks = {}
  _clear()
end
describe(detail=0, indent=0) click to toggle source

Returns a String describing the Flow. @param detail [Fixnum] higher values result in more detail in the description @param indent [Fixnum] the number of spaces to indent the description

# File lib/oflow/flow.rb, line 241
def describe(detail=0, indent=0)
  i = ' ' * indent
  lines = ["#{i}#{name} (#{self.class}) {"]
  @tasks.each_value { |t|
    lines << t.describe(detail, indent + 2)
  }
  lines << i + "}"
  lines.join("\n")
end
each_task(&blk) click to toggle source

Iterates over each Task and yields to the provided block with each Task. @param blk [Proc] Proc to call on each iteration

# File lib/oflow/flow.rb, line 101
def each_task(&blk)
  @tasks.each { |name,task| blk.yield(task) }
end
error_handler() click to toggle source

Returns a error_handler Task by looking for that Task in an attribute and then in the contained Tasks or Tasks in outer Flows. @return [Task] error_handler Task.

# File lib/oflow/flow.rb, line 48
def error_handler()
  return @error_handler unless @error_handler.nil?
  eh = find_task(:error)
  return eh unless eh.nil?
  @env.error_handler
end
find_task(name) click to toggle source

Locates and return a Task with the specified name. @param name [String] name of the Task @return [Task|nil] the Task with the name specified or nil

# File lib/oflow/flow.rb, line 123
def find_task(name)
  name = name.to_sym unless name.nil?
  @tasks[name]
end
flush() click to toggle source

Wakes up all the Tasks in the Flow and waits for the system to become idle before returning.

# File lib/oflow/flow.rb, line 201
def flush()
  wakeup()
  @tasks.each_value { |t| t.flush() }
  while busy?
    sleep(0.2)
  end
end
full_name() click to toggle source

Similar to a full file path. The full_name described the containment of the named item. @return [String] full name of item

# File lib/oflow/flow.rb, line 31
def full_name()
  @name.to_s
end
locate(name) click to toggle source

Locates and return a Task with the specified full name. @param name [String] full name of the Task @return [Task|nil] the Task with the name specified or nil

# File lib/oflow/flow.rb, line 131
def locate(name)
  name = name[1..-1] if name.start_with?(':')
  name = name[0..-2] if name.end_with?(':')
  path = name.split(':')
  _locate(path)
end
log() click to toggle source

Returns a log Task by looking for that Task in an attribute and then in the contained Tasks or Tasks in outer Flows. @return [Task] log Task.

# File lib/oflow/flow.rb, line 38
def log()
  return @log unless @log.nil?
  lg = find_task(:log)
  return lg unless lg.nil?
  @env.log
end
queue_count() click to toggle source

Returns the sum of all the requests in all the Tasks's queues. @return [Fixnum] total number of items waiting to be processed

# File lib/oflow/flow.rb, line 151
def queue_count()
  cnt = 0
  @tasks.each_value { |task| cnt += task.queue_count() }
  cnt
end
shutdown(flush_first=false) click to toggle source

Shuts down all Tasks. @param flush_first [true|false] flag indicating shutdown should occur after the system becomes idle

# File lib/oflow/flow.rb, line 219
def shutdown(flush_first=false)
  # block all tasks first so threads can empty queues
  @tasks.each_value do |task|
    task.state = Task::BLOCKED
  end
  # shutdown and wait for queues to empty if necessary
  @tasks.each_value do |task|
    task.shutdown(flush_first)
  end
  @tasks = {}
end
start() click to toggle source

Calls the start() method on all Tasks.

# File lib/oflow/flow.rb, line 189
def start()
  raise ValidateError.new("#{full_name} not validated.") unless @prepared
  @tasks.each_value { |task| task.start() }
end
state=(s) click to toggle source

Sets the state of all Tasks recursively. This should not be called directly.

# File lib/oflow/flow.rb, line 211
def state=(s)
  @tasks.each_value do |task|
    task.state = s
  end
end
step() click to toggle source

Calls the step() method one Task that is stopped and has an item in the queue. The Tasks with the highest backed_up() value is selected.

# File lib/oflow/flow.rb, line 172
def step()
  max = 0.0
  best = nil
  walk_tasks() do |t|
    if Task::STOPPED == t.state
      bu = t.backed_up()
      if max < bu
        best = t
        max = bu
      end
    end
  end
  best.step() unless best.nil?
  best
end
stop() click to toggle source

Calls the stop() method on all Tasks.

# File lib/oflow/flow.rb, line 166
def stop()
  @tasks.each_value { |task| task.stop() }
end
task(name, actor_class, options={}) { |t| ... } click to toggle source

Creates a Task and yield to a block with the newly create Task. Used to configure Tasks. @param name [Symbol|String] base name for the Task @param actor_class [Class] Class to create an Actor instance of @param options [Hash] optional parameters @param block [Proc] block to yield to with the new Task instance @return [Task] new Task

# File lib/oflow/flow.rb, line 62
def task(name, actor_class, options={}, &block)
  has_state = options.has_key?(:state)
  unless has_state
    options = options.clone
    options[:state] = Task::STOPPED
  end
  t = Task.new(self, name, actor_class, options)
  @tasks[t.name] = t
  yield(t) if block_given?
  t
end
task_count() click to toggle source

Returns the number of active Tasks.

# File lib/oflow/flow.rb, line 145
def task_count()
  @tasks.size
end
validate() click to toggle source

Validates the container by verifying all links on a task have been set to a valid destination and that destination has been resolved. @raise [ValidateError] if there is an error in validation

# File lib/oflow/flow.rb, line 77
def validate()
  # collects errors and raises all errors at once if there are any
  errors = _validation_errors()
  raise ValidateError.new(errors) unless errors.empty?
end
wakeup() click to toggle source

Wakes up all the Tasks in the Flow.

# File lib/oflow/flow.rb, line 195
def wakeup()
  @tasks.each_value { |t| t.wakeup() }
end
walk_tasks(tasks_only=true, &blk) click to toggle source

Performs a recursive walk over all Tasks and yields to the provided block for each. Flows are followed recusively. @param tasks_only [true|false] indicates on Tasks and not Flows are yielded to @param blk [Proc] Proc to call on each iteration

# File lib/oflow/flow.rb, line 109
def walk_tasks(tasks_only=true, &blk)
  @tasks.each_value do |t|
    if t.is_a?(Task)
      blk.yield(t)
    else
      blk.yield(t) unless tasks_only
      t.walk_tasks(tasks_only, &blk)
    end
  end
end