class Orchestrator::Control

Attributes

logger[R]
loop[R]
ready[R]
ready_promise[R]
threads[R]
zones[R]

Public Class Methods

new() click to toggle source
  1. Load the modules allocated to this node

  2. Allocate modules to CPUs

    • Modules load dependencies as required

    • Logics are streamed in after devices and services

Logic modules will fetch their system when they interact with other modules.

Devices and services do not have a system associated with them

This makes systems very loosely coupled to the modules

which should make distributing the system slightly simpler
# File lib/orchestrator/control.rb, line 23
def initialize
    # critical sections
    @critical = ::Mutex.new
    @loaded = ::ThreadSafe::Cache.new
    @zones = ::ThreadSafe::Cache.new
    @loader = DependencyManager.instance
    @loop = ::Libuv::Loop.default
    @exceptions = method(:log_unhandled_exception)

    @ready = false
    @ready_defer = @loop.defer
    @ready_promise = @ready_defer.promise

    # We keep track of unloaded modules so we can optimise loading them again
    @unloaded = Set.new

    if Rails.env.production?
        logger = ::Logger.new(::Rails.root.join('log/control.log').to_s, 10, 4194304)
    else
        logger = ::Logger.new(STDOUT)
    end
    logger.formatter = proc { |severity, datetime, progname, msg|
        "#{datetime.strftime("%d/%m/%Y @ %I:%M%p")} #{severity}: #{progname} - #{msg}\n"
    }
    @logger = ::ActiveSupport::TaggedLogging.new(logger)
end

Public Instance Methods

boot(*args) click to toggle source

Boot the control system, running all defined modules

# File lib/orchestrator/control.rb, line 88
def boot(*args)
    # Only boot if running as a server
    Thread.new &method(:load_all)
end
load(mod_settings) click to toggle source

Load the modules on the loop references in round robin This method is thread safe.

# File lib/orchestrator/control.rb, line 95
def load(mod_settings)
    mod_id = mod_settings.id.to_sym
    defer = @loop.defer
    mod = @loaded[mod_id]

    if mod
        defer.resolve(mod)
    else
        defer.resolve(
            @loader.load(mod_settings.dependency).then(proc { |klass|
                # We will always be on the default loop here
                thread = @selector.next

                # We'll resolve the promise if the module loads on the deferred thread
                defer = @loop.defer
                thread.schedule do
                    defer.resolve(start_module(thread, klass, mod_settings))
                end

                # update the module cache
                defer.promise.then do |mod_manager|
                    @loaded[mod_id] = mod_manager

                    # Transfer any existing observers over to the new thread
                    if @ready && @unloaded.include?(mod_id)
                        @unloaded.delete(mod_id)
                        
                        new_thread = thread.observer
                        @threads.each do |thr|
                            thr.observer.move(mod_id, new_thread)
                        end
                    end

                    # Return the manager
                    mod_manager
                end
                defer.promise
            }, @exceptions)
        )
    end
    defer.promise
end
loaded?(mod_id) click to toggle source

Checks if a module with the ID specified is loaded

# File lib/orchestrator/control.rb, line 139
def loaded?(mod_id)
    @loaded[mod_id.to_sym]
end
log_unhandled_exception(*args) click to toggle source
# File lib/orchestrator/control.rb, line 222
def log_unhandled_exception(*args)
    msg = ''
    err = args[-1]
    if err && err.respond_to?(:backtrace)
        msg << "exception: #{err.message} (#{args[0..-2]})"
        msg << "\n#{err.backtrace.join("\n")}" if err.respond_to?(:backtrace) && err.backtrace
    else
        msg << "unhandled exception: #{args}"
    end
    @logger.error msg
    ::Libuv::Q.reject(@loop, msg)
end
mount() click to toggle source

Start the control reactor

# File lib/orchestrator/control.rb, line 55
def mount
    return @server.loaded if @server

    @critical.synchronize {
        return if @server   # Protect against multiple mounts

        # Cache all the zones in the system
        ::Orchestrator::Zone.all.each do |zone|
            @zones[zone.id] = zone
        end

        @server = ::SpiderGazelle::Spider.instance
        @server.loaded.then do
            # Share threads with SpiderGazelle (one per core)
            if @server.mode == :thread
                @threads = @server.threads
            else    # We are either running no_ipc or process (unsupported for control)
                @threads = Set.new

                cpus = ::Libuv.cpu_count || 1
                cpus.times &method(:start_thread)

                @loop.signal :INT, method(:kill_workers)
            end

            @selector = @threads.cycle
        end
    }

    return @server.loaded
end
notify_ready() click to toggle source
# File lib/orchestrator/control.rb, line 215
def notify_ready
    # Clear the system cache (in case it has been populated at all)
    System.clear_cache
    @ready = true
    @ready_defer.resolve(true)
end
reload(dep_id) click to toggle source
# File lib/orchestrator/control.rb, line 209
def reload(dep_id)
    @loop.work do
        reload_dep(dep_id)
    end
end
start(mod_id) click to toggle source

Starts a module running

# File lib/orchestrator/control.rb, line 144
def start(mod_id)
    defer = @loop.defer

    mod = loaded? mod_id
    if mod
        mod.thread.schedule do
            mod.start
            defer.resolve(true)
        end
    else
        err = Error::ModuleNotFound.new "unable to start module '#{mod_id}', not found"
        defer.reject(err)
        @logger.warn err.message
    end

    defer.promise
end
stop(mod_id) click to toggle source

Stops a module running

# File lib/orchestrator/control.rb, line 163
def stop(mod_id)
    defer = @loop.defer

    mod = loaded? mod_id
    if mod
        mod.thread.schedule do
            mod.stop
            defer.resolve(true)
        end
    else
        err = Error::ModuleNotFound.new "unable to stop module '#{mod_id}', not found"
        defer.reject(err)
        @logger.warn err.message
    end

    defer.promise
end
unload(mod_id) click to toggle source

Stop the module gracefully Then remove it from @loaded

# File lib/orchestrator/control.rb, line 183
def unload(mod_id)
    mod = mod_id.to_sym
    stop(mod).then(proc {
        @unloaded << mod
        @loaded.delete(mod)
        true # promise response
    })
end
update(mod_id) click to toggle source

Unload then Get a fresh version of the settings from the database load the module

# File lib/orchestrator/control.rb, line 195
def update(mod_id)
    unload(mod_id).then(proc {
        # Grab database model in the thread pool
        res = @loop.work do
            ::Orchestrator::Module.find(mod_id)
        end

        # Load the module if model found
        res.then(proc { |config|
            load(config)    # Promise chaining to here
        })
    })
end

Protected Instance Methods

continue_loading(modules) click to toggle source

Load all the logic modules after the device modules are complete

# File lib/orchestrator/control.rb, line 288
def continue_loading(modules)
    loading = []

    modules.each do |mod|
        loading << load(mod)  # grab the load promises
    end

    # Once load is complete we'll accept websockets
    ::Libuv::Q.finally(@loop, *loading).finally method(:notify_ready)
end
kill_workers(*args) click to toggle source
# File lib/orchestrator/control.rb, line 317
def kill_workers(*args)
    @threads.each do |thread|
        thread.stop
    end
    @loop.stop
end
load_all() click to toggle source

Grab the modules from the database and load them

# File lib/orchestrator/control.rb, line 254
def load_all
    loading = []
    wait = nil

    modules = ::Orchestrator::Module.all
    modules.each do |mod|
        if mod.role < 3
            loading << load(mod)  # modules are streamed in
        else
            if wait.nil?
                wait = ::Libuv::Q.finally(@loop, *loading)
                loading.clear

                # Clear here in case rest api calls have built the cache
                System.clear_cache
            end

            loading << mod
        end
    end

    # In case there were no logic modules
    if wait.nil?
        wait = ::Libuv::Q.finally(@loop, *loading)
        loading.clear
    end

    # Mark system as ready
    wait.finally do
        continue_loading(loading)
    end
end
start_module(thread, klass, settings) click to toggle source

This will always be called on the thread reactor here

# File lib/orchestrator/control.rb, line 240
def start_module(thread, klass, settings)
    # Initialize the connection / logic / service handler here
    case settings.dependency.role
    when :device
        Device::Manager.new(thread, klass, settings)
    when :service
        Service::Manager.new(thread, klass, settings)
    else
        Logic::Manager.new(thread, klass, settings)
    end
end
start_thread(num) click to toggle source

Methods called when we manage the threads:

# File lib/orchestrator/control.rb, line 303
def start_thread(num)
    thread = Libuv::Loop.new
    @threads << thread
    Thread.new do
        thread.run do |promise|
            promise.progress @exceptions

            thread.async do
                p 'noop'
            end
        end
    end
end