class OFlow::Env

The platform that Flows are created in. It is the outer most element of the OFlow system.

Public Class Methods

log_level() click to toggle source

Returns the default log level. @return [Fixnum] the default log level which is one of the Logger::Severity values.

# File lib/oflow/env.rb, line 15
def self.log_level()
  @@log_level
end
log_level=(level) click to toggle source

Sets the default log level. @param level [Fixnum] Logger::Severity to set the default log level to

# File lib/oflow/env.rb, line 21
def self.log_level=(level)
  @@log_level = level unless level < Logger::Severity::DEBUG || Logger::Severity::FATAL < level
  #@log.receive(:severity, Box.new(@log_level)) unless @log.nil?
end
new(name='') click to toggle source
# File lib/oflow/env.rb, line 26
def initialize(name='')
  # The default logging level.
  @flows = {}
  @prepared = false
  @name = name
  @log = nil
  _clear()
end

Public Instance Methods

_clear() click to toggle source

Resets the error handler and log. Usually called on init and by the clear() method.

# File lib/oflow/env.rb, line 241
def _clear()
  @error_handler = Task.new(self, :error, Actors::ErrorHandler)
  @log = Task.new(self, :log, Actors::Log)
end
_locate(path) click to toggle source
# File lib/oflow/env.rb, line 137
def _locate(path)
  f = @flows[path[0].to_sym]
  return f if f.nil? || 1 == path.size
  f._locate(path[1..-1])
end
_validation_errors() click to toggle source

Returns an Array of validation errors.

# File lib/oflow/env.rb, line 82
def _validation_errors()
  errors = []
  @flows.each_value { |f| errors += f._validation_errors() }
  errors
end
busy?() click to toggle source

Returns true of one or more Tasks is either processing a request or has a request waiting to be processed on it's input queue. @return [true|false] the busy state across all Tasks

# File lib/oflow/env.rb, line 159
def busy?
  @flows.each_value { |f| return true if f.busy? }
  return true if !@log.nil? && @log.busy?
  return true if !@error_handler.nil? && @error_handler.busy?
  false
end
clear() click to toggle source

Clears out all Tasks and Flows and resets the object back to a empty state.

# File lib/oflow/env.rb, line 233
def clear()
  shutdown()
  @flows = {}
  _clear()
end
describe(detail=0, indent=0) click to toggle source

Describes all the Flows and Tasks in the system.

# File lib/oflow/env.rb, line 247
def describe(detail=0, indent=0)
  i = ' ' * indent
  lines = ["#{i}#{@name} (#{self.class.name}) {"]
  @flows.each_value { |f|
    lines << f.describe(detail, indent + 2)
  }
  lines << i + "}"
  lines.join("\n")
end
each_flow(&blk) click to toggle source

Iterates over each Flow and yields to the provided block with each Flow. @param blk [Proc] Proc to call on each iteration

# File lib/oflow/env.rb, line 97
def each_flow(&blk)
  @flows.each { |name,flow| blk.yield(flow) }
end
error_handler() click to toggle source

Returns a error_handler Task if one is set on the instance. @return [Task] error_handler Task.

# File lib/oflow/env.rb, line 47
def error_handler()
  @error_handler
end
find_flow(name) click to toggle source

Locates and return a Flow with the specified name. @param name [String] name of the Flow @return [Flow|nil] the Flow with the name specified or nil

# File lib/oflow/env.rb, line 122
def find_flow(name)
  name = name.to_sym unless name.nil?
  @flows[name]
end
flow(name) { |f| ... } click to toggle source

Creates a Flow and yield to a block with the newly create Flow. Used to contruct Flows. @param name [Symbol|String] base name for the Flow @param options [Hash] optional parameters @param block [Proc] block to yield to with the new Flow instance @return [Flow] new Flow

# File lib/oflow/env.rb, line 57
def flow(name, &block)
  f = Flow.new(self, name)
  @flows[f.name] = f
  yield(f) if block_given?
  f
end
flow_count() click to toggle source

Returns the number of active Tasks.

# File lib/oflow/env.rb, line 144
def flow_count()
  @flows.size
end
flush() click to toggle source

Wakes up all the Tasks in the Flow and waits for the system to become idle before returning.

# File lib/oflow/env.rb, line 202
def flush()
  wakeup()
  @flows.each_value { |f| f.flush() }
  while busy?
    sleep(0.2)
  end
end
full_name() click to toggle source
# File lib/oflow/env.rb, line 35
def full_name()
  @name
end
locate(name) click to toggle source

Locates and return a Task with the specified full name. @param name [String] full name of the Task @return [Flow|Task|nil] the Flow or Task with the name specified or nil

# File lib/oflow/env.rb, line 130
def locate(name)
  name = name[1..-1] if name.start_with?(':')
  name = name[0..-2] if name.end_with?(':')
  path = name.split(':')
  _locate(path)
end
log() click to toggle source

Returns a log Task if one is set on the instance. @return [Task] log Task.

# File lib/oflow/env.rb, line 41
def log()
  @log
end
prepare() click to toggle source
# File lib/oflow/env.rb, line 64
def prepare()
  @flows.each_value { |f|
    f.resolve_all_links()
  }
  validate()
  @prepared = true
end
queue_count() click to toggle source

Returns the sum of all the requests in all the Flow's Task's queues. @return [Fixnum] total number of items waiting to be processed

# File lib/oflow/env.rb, line 150
def queue_count()
  cnt = 0
  @flows.each_value { |f| cnt += f.queue_count() }
  cnt
end
shutdown(flush_first=false) click to toggle source

Shuts down all Tasks. @param flush_first [true|false] flag indicating shutdown should occur after the system becomes idle

# File lib/oflow/env.rb, line 220
def shutdown(flush_first=false)
  # block all tasks first so threads can empty queues
  @flows.each_value do |f|
    f.state = Task::BLOCKED
  end
  # shutdown and wait for queues to empty if necessary
  @flows.each_value do |f|
    f.shutdown(flush_first)
  end
  @flows = {}
end
start() click to toggle source

Calls the start() method on all Tasks.

# File lib/oflow/env.rb, line 190
def start()
  prepare() unless @prepared
  @flows.each_value { |f| f.start() }
end
state=(s) click to toggle source

Sets the state of all Tasks recursively. This should not be called directly.

# File lib/oflow/env.rb, line 212
def state=(s)
  @flows.each_value do |f|
    f.state = s
  end
end
step() click to toggle source

Calls the step() method one Task that is stopped and has an item in the queue. The Tasks with the highest backed_up() value is selected.

# File lib/oflow/env.rb, line 173
def step()
  max = 0.0
  best = nil
  walk_tasks() do |t|
    if Task::STOPPED == t.state
      bu = t.backed_up()
      if max < bu
        best = t
        max = bu
      end
    end
  end
  best.step() unless best.nil?
  best
end
stop() click to toggle source

Calls the stop() method on all Tasks.

# File lib/oflow/env.rb, line 167
def stop()
  @flows.each_value { |f| f.stop() }
end
validate() click to toggle source

Validates the container by verifying all links on a task have been set to a valid destination and that destination has been resolved. @raise [ValidateError] if there is an error in validation

# File lib/oflow/env.rb, line 75
def validate()
  # collects errors and raises all errors at once if there are any
  errors = _validation_errors()
  raise ValidateError.new(errors) unless errors.empty?
end
wakeup() click to toggle source

Wakes up all the Tasks in the Flow.

# File lib/oflow/env.rb, line 196
def wakeup()
  @flows.each_value { |f| f.wakeup() }
end
walk_flows(&blk) click to toggle source

Performs a recursive walk over all Flows and yields to the provided block for each. @param blk [Proc] Proc to call on each iteration

# File lib/oflow/env.rb, line 104
def walk_flows(&blk)
  @flows.each_value do |f|
    blk.yield(t)
  end
end
walk_tasks(&blk) click to toggle source

Performs a recursive walk over all Tasks in all Flows and yields to the provided block for each. @param blk [Proc] Proc to call on each iteration

# File lib/oflow/env.rb, line 113
def walk_tasks(&blk)
  @flows.each_value do |f|
    f.walk_tasks(&blk)
  end
end