class Orchestrator::Core::ModuleManager
Attributes
Public Class Methods
# File lib/orchestrator/core/module_manager.rb, line 4 def initialize(thread, klass, settings) @thread = thread # Libuv Loop @settings = settings # Database model @klass = klass # Bit of a hack - should make testing pretty easy though @status = ::ThreadSafe::Cache.new @stattrak = @thread.observer @logger = ::Orchestrator::Logger.new(@thread, @settings) @updating = Mutex.new end
Public Instance Methods
Called from subscribe and SystemProxy.subscribe
always on the module thread
# File lib/orchestrator/core/module_manager.rb, line 131 def add_subscription(sub) if sub.is_a? ::Libuv::Q::Promise # Promise recursion? sub.then method(:add_subscription) else @subsciptions ||= Set.new @subsciptions.add sub end end
Called from Core::Mixin
on any thread
Settings updates are done on the thread pool We have to replace the structure as other threads may be reading from the old structure and the settings hash is not thread safe
# File lib/orchestrator/core/module_manager.rb, line 175 def define_setting(name, value) defer = thread.defer thread.schedule do defer.resolve(thread.work(proc { mod = Orchestrator::Module.find(@settings.id) mod.settings[name] = value mod.save! mod })) end defer.promise.then do |db_model| @settings = db_model value # Don't leak direct access to the database model end end
# File lib/orchestrator/core/module_manager.rb, line 79 def get_scheduler @scheduler ||= ::Orchestrator::Core::ScheduleProxy.new(@thread) end
This is called from Core::Mixin
on the thread pool as the DB query will be blocking
- NOTE
-
Couchbase does support non-blocking gets although I think this is simpler
@return [::Orchestrator::Core::SystemProxy]
# File lib/orchestrator/core/module_manager.rb, line 87 def get_system(name) id = ::Orchestrator::ControlSystem.bucket.get("sysname-#{name.downcase}", {quiet: true}) || name ::Orchestrator::Core::SystemProxy.new(@thread, id.to_sym, self) end
override the default inspect method This provides relevant information and won’t blow the stack on an error
# File lib/orchestrator/core/module_manager.rb, line 194 def inspect "#<#{self.class}:0x#{self.__id__.to_s(16)} @thread=#{@thread.inspect} running=#{!@instance.nil?} managing=#{@klass.to_s} id=#{@settings.id}>" end
# File lib/orchestrator/core/module_manager.rb, line 64 def reloaded(mod) @thread.schedule do # pass in any updated settings @settings = mod if @instance.respond_to? :on_update, true begin @instance.__send__(:on_update) rescue => e @logger.print_error(e, 'error in module update callback') end end end end
Called from Core::Mixin
on any thread For Logics: instance -> system -> zones -> dependency For Device: instance -> dependency
# File lib/orchestrator/core/module_manager.rb, line 144 def setting(name) res = @settings.settings[name] if res.nil? if @settings.control_system_id sys = System.get(@settings.control_system_id) res = sys.settings[name] # Check if zones have the setting if res.nil? sys.zones.each do |zone| res = zone.settings[name] return res unless res.nil? end # Fallback to the dependency res = @settings.dependency.settings[name] end else # Fallback to the dependency res = @settings.dependency.settings[name] end end res end
# File lib/orchestrator/core/module_manager.rb, line 45 def start return true unless @instance.nil? config = self @instance = @klass.new @instance.instance_eval { @__config__ = config } if @instance.respond_to? :on_load, true begin @instance.__send__(:on_load) rescue => e @logger.print_error(e, 'error in module load callback') end end update_running_status(true) true # for REST API rescue => e @logger.print_error(e, 'module failed to start') false end
Should always be called on the module thread
# File lib/orchestrator/core/module_manager.rb, line 24 def stop return if @instance.nil? begin if @instance.respond_to? :on_unload, true @instance.__send__(:on_unload) end rescue => e @logger.print_error(e, 'error in module unload callback') ensure # Clean up @instance = nil @scheduler.clear if @scheduler if @subsciptions unsub = @stattrak.method(:unsubscribe) @subsciptions.each &unsub @subsciptions = nil end update_running_status(false) end end
Subscribe to status updates from status in the same module Called from Core::Mixin
always on the module thread
# File lib/orchestrator/core/module_manager.rb, line 107 def subscribe(status, callback) sub = @stattrak.subscribe({ on_thread: @thread, callback: callback, status: status.to_sym, mod_id: @settings.id.to_sym, mod: self }) add_subscription sub sub end
Called from Core::Mixin
- thread safe
# File lib/orchestrator/core/module_manager.rb, line 93 def trak(name, value) if @status[name] != value @status[name] = value # Allows status to be updated in workers # For the most part this will run straight away @thread.schedule do @stattrak.update(@settings.id.to_sym, name, value) end end end
Called from Core::Mixin
always on the module thread
# File lib/orchestrator/core/module_manager.rb, line 120 def unsubscribe(sub) if sub.is_a? ::Libuv::Q::Promise # Promise recursion? sub.then method(:unsubscribe) else @subsciptions.delete sub @stattrak.unsubscribe(sub) end end
Protected Instance Methods
# File lib/orchestrator/core/module_manager.rb, line 202 def update_connected_status(connected) id = settings.id # Access the database in a non-blocking fashion thread.work(proc { @updating.synchronize { model = ::Orchestrator::Module.find_by_id id if model && model.connected != connected model.connected = connected model.save! model else nil end } }).then(proc { |model| # Update the model if it was updated if model @settings = model end }, proc { |e| # report any errors updating the model @logger.print_error(e, 'error updating connected state in database model') }) end
# File lib/orchestrator/core/module_manager.rb, line 229 def update_running_status(running) id = settings.id # Access the database in a non-blocking fashion thread.work(proc { @updating.synchronize { model = ::Orchestrator::Module.find_by_id id if model && model.running != running model.running = running model.connected = false if !running model.save! model else nil end } }).then(proc { |model| # Update the model if it was updated if model @settings = model end }, proc { |e| # report any errors updating the model @logger.print_error(e, 'error updating running state in database model') }) end