class Orchestrator::Core::ModuleManager

Attributes

current_user[RW]
instance[R]
logger[R]
settings[R]
stattrak[R]
status[R]
thread[R]

Public Class Methods

new(thread, klass, settings) click to toggle source
# File lib/orchestrator/core/module_manager.rb, line 4
def initialize(thread, klass, settings)
    @thread = thread        # Libuv Loop
    @settings = settings    # Database model
    @klass = klass
    
    # Bit of a hack - should make testing pretty easy though
    @status = ::ThreadSafe::Cache.new
    @stattrak = @thread.observer
    @logger = ::Orchestrator::Logger.new(@thread, @settings)

    @updating = Mutex.new
end

Public Instance Methods

add_subscription(sub) click to toggle source

Called from subscribe and SystemProxy.subscribe always on the module thread

# File lib/orchestrator/core/module_manager.rb, line 131
def add_subscription(sub)
    if sub.is_a? ::Libuv::Q::Promise
        # Promise recursion?
        sub.then method(:add_subscription)
    else
        @subsciptions ||= Set.new
        @subsciptions.add sub
    end
end
define_setting(name, value) click to toggle source

Called from Core::Mixin on any thread

Settings updates are done on the thread pool We have to replace the structure as other threads may be reading from the old structure and the settings hash is not thread safe

# File lib/orchestrator/core/module_manager.rb, line 175
def define_setting(name, value)
    defer = thread.defer
    thread.schedule do
        defer.resolve(thread.work(proc {
            mod = Orchestrator::Module.find(@settings.id)
            mod.settings[name] = value
            mod.save!
            mod
        }))
    end
    defer.promise.then do |db_model|
        @settings = db_model
        value # Don't leak direct access to the database model
    end
end
get_scheduler() click to toggle source
# File lib/orchestrator/core/module_manager.rb, line 79
def get_scheduler
    @scheduler ||= ::Orchestrator::Core::ScheduleProxy.new(@thread)
end
get_system(name) click to toggle source

This is called from Core::Mixin on the thread pool as the DB query will be blocking

NOTE

Couchbase does support non-blocking gets although I think this is simpler

@return [::Orchestrator::Core::SystemProxy]

# File lib/orchestrator/core/module_manager.rb, line 87
def get_system(name)
    id = ::Orchestrator::ControlSystem.bucket.get("sysname-#{name.downcase}", {quiet: true}) || name
    ::Orchestrator::Core::SystemProxy.new(@thread, id.to_sym, self)
end
inspect() click to toggle source

override the default inspect method This provides relevant information and won’t blow the stack on an error

# File lib/orchestrator/core/module_manager.rb, line 194
def inspect
    "#<#{self.class}:0x#{self.__id__.to_s(16)} @thread=#{@thread.inspect} running=#{!@instance.nil?} managing=#{@klass.to_s} id=#{@settings.id}>"
end
reloaded(mod) click to toggle source
# File lib/orchestrator/core/module_manager.rb, line 64
def reloaded(mod)
    @thread.schedule do
        # pass in any updated settings
        @settings = mod

        if @instance.respond_to? :on_update, true
            begin
                @instance.__send__(:on_update)
            rescue => e
                @logger.print_error(e, 'error in module update callback')
            end
        end
    end
end
setting(name) click to toggle source

Called from Core::Mixin on any thread For Logics: instance -> system -> zones -> dependency For Device: instance -> dependency

# File lib/orchestrator/core/module_manager.rb, line 144
def setting(name)
    res = @settings.settings[name]
    if res.nil?
        if @settings.control_system_id
            sys = System.get(@settings.control_system_id)
            res = sys.settings[name]

            # Check if zones have the setting
            if res.nil?
                sys.zones.each do |zone|
                    res = zone.settings[name]
                    return res unless res.nil?
                end

                # Fallback to the dependency
                res = @settings.dependency.settings[name]
            end
        else
            # Fallback to the dependency
            res = @settings.dependency.settings[name]
        end
    end
    res
end
start() click to toggle source
# File lib/orchestrator/core/module_manager.rb, line 45
def start
    return true unless @instance.nil?
    config = self
    @instance = @klass.new
    @instance.instance_eval { @__config__ = config }
    if @instance.respond_to? :on_load, true
        begin
            @instance.__send__(:on_load)
        rescue => e
            @logger.print_error(e, 'error in module load callback')
        end
    end
    update_running_status(true)
    true # for REST API
rescue => e
    @logger.print_error(e, 'module failed to start')
    false
end
stop() click to toggle source

Should always be called on the module thread

# File lib/orchestrator/core/module_manager.rb, line 24
def stop
    return if @instance.nil?
    begin
        if @instance.respond_to? :on_unload, true
            @instance.__send__(:on_unload)
        end
    rescue => e
        @logger.print_error(e, 'error in module unload callback')
    ensure
        # Clean up
        @instance = nil
        @scheduler.clear if @scheduler
        if @subsciptions
            unsub = @stattrak.method(:unsubscribe)
            @subsciptions.each &unsub
            @subsciptions = nil
        end
        update_running_status(false)
    end
end
subscribe(status, callback) click to toggle source

Subscribe to status updates from status in the same module Called from Core::Mixin always on the module thread

# File lib/orchestrator/core/module_manager.rb, line 107
def subscribe(status, callback)
    sub = @stattrak.subscribe({
        on_thread: @thread,
        callback: callback,
        status: status.to_sym,
        mod_id: @settings.id.to_sym,
        mod: self
    })
    add_subscription sub
    sub
end
trak(name, value) click to toggle source

Called from Core::Mixin - thread safe

# File lib/orchestrator/core/module_manager.rb, line 93
def trak(name, value)
    if @status[name] != value
        @status[name] = value

        # Allows status to be updated in workers
        # For the most part this will run straight away
        @thread.schedule do
            @stattrak.update(@settings.id.to_sym, name, value)
        end
    end
end
unsubscribe(sub) click to toggle source

Called from Core::Mixin always on the module thread

# File lib/orchestrator/core/module_manager.rb, line 120
def unsubscribe(sub)
    if sub.is_a? ::Libuv::Q::Promise
        # Promise recursion?
        sub.then method(:unsubscribe)
    else
        @subsciptions.delete sub
        @stattrak.unsubscribe(sub)
    end
end

Protected Instance Methods

update_connected_status(connected) click to toggle source
# File lib/orchestrator/core/module_manager.rb, line 202
def update_connected_status(connected)
    id = settings.id

    # Access the database in a non-blocking fashion
    thread.work(proc {
        @updating.synchronize {
            model = ::Orchestrator::Module.find_by_id id

            if model && model.connected != connected
                model.connected = connected
                model.save!
                model
            else
                nil
            end
        }
    }).then(proc { |model|
        # Update the model if it was updated
        if model
            @settings = model
        end
    }, proc { |e|
        # report any errors updating the model
        @logger.print_error(e, 'error updating connected state in database model')
    })
end
update_running_status(running) click to toggle source
# File lib/orchestrator/core/module_manager.rb, line 229
def update_running_status(running)
    id = settings.id

    # Access the database in a non-blocking fashion
    thread.work(proc {
        @updating.synchronize {
            model = ::Orchestrator::Module.find_by_id id

            if model && model.running != running
                model.running = running
                model.connected = false if !running
                model.save!
                model
            else
                nil
            end
        }
    }).then(proc { |model|
        # Update the model if it was updated
        if model
            @settings = model
        end
    }, proc { |e|
        # report any errors updating the model
        @logger.print_error(e, 'error updating running state in database model')
    })
end