class Orchestrator::Status
Attributes
thread[R]
Public Class Methods
new(thread)
click to toggle source
# File lib/orchestrator/status.rb, line 34 def initialize(thread) @thread = thread @controller = ::Orchestrator::Control.instance @find_subscription = method(:find_subscription) # {:mod_id => {status => Subscriptions}} @subscriptions = {} # {:system_id => Subscriptions} @systems = {} end
Public Instance Methods
exec_unsubscribe(sub)
click to toggle source
- NOTE
-
Only to be called from subscription thread
# File lib/orchestrator/status.rb, line 177 def exec_unsubscribe(sub) # Update the system lookup if a system was specified if sub.sys_id subscriptions = @systems[sub.sys_id] if subscriptions subscriptions.delete(sub) if subscriptions.empty? @systems.delete(sub.sys_id) end end end # Update the module lookup statuses = @subscriptions[sub.mod_id] if statuses subscriptions = statuses[sub.status] if subscriptions subscriptions.delete(sub) if subscriptions.empty? statuses.delete(sub.status) if statuses.empty? @subscriptions.delete(sub.mod_id) end end end end end
move(mod_id, to_thread)
click to toggle source
Used to maintain subscriptions where module is moved to another thread or even another server.
# File lib/orchestrator/status.rb, line 102 def move(mod_id, to_thread) return if to_thread == self @thread.schedule do subs = @subscriptions.delete(mod_id) if subs # Remove the system references subs.each do |sub| @systems[sub.sys_id].delete(sub) if sub.sys_id end # Transfer the subscriptions to_thread.transfer(mod_id, subs) end end end
reloaded_system(sys_id, sys)
click to toggle source
The System
class contacts each of the threads to let them know of an update
# File lib/orchestrator/status.rb, line 133 def reloaded_system(sys_id, sys) subscriptions = @systems[sys_id] if subscriptions subscriptions.each do |sub| old_id = sub.mod_id # re-index the subscription mod = sys.get(sub.mod_name, sub.index - 1) sub.mod_id = mod ? mod.settings.id.to_sym : nil # Check for changes (order, removal, replacement) if old_id != sub.mod_id @subscriptions[old_id][sub.status].delete(sub) # Update to the new module if sub.mod_id @subscriptions[sub.mod_id] ||= {} @subscriptions[sub.mod_id][sub.status] ||= Set.new @subscriptions[sub.mod_id][sub.status].add(sub) # Check for existing status to send to subscriber value = mod.status[sub.status] sub.notify(value) unless value.nil? end # Transfer the subscription if on a different thread if mod.thread != @thread move(sub.mod_id.to_sym, mod.thread) end # Perform any required cleanup if @subscriptions[old_id][sub.status].empty? @subscriptions[old_id].delete(sub.status) if @subscriptions[old_id].empty? @subscriptions.delete(old_id) end end end end end end
subscribe(opt)
click to toggle source
Subscribes to updates from a system module Modules do not have to exist and updates will be triggered as soon as they are
# File lib/orchestrator/status.rb, line 52 def subscribe(opt) # sys_name, mod_name, index, status, callback, on_thread if opt[:sys_name] && !opt[:sys_id] @thread.work(proc { id = ::Orchestrator::ControlSystem.bucket.get("sysname-#{sys_name}") opt[:sys_id] = id # Grabbing system here as thread-safe and has the potential to block ::Orchestrator::System.get(id) }).then(proc { |sys| mod = sys.get(opt[:mod_name], opt[:index] - 1) if mod opt[:mod_id] = mod.settings.id.to_sym opt[:mod] = mod end do_subscribe(opt) }) else do_subscribe(opt) end end
transfer(mod_id, subs)
click to toggle source
# File lib/orchestrator/status.rb, line 119 def transfer(mod_id, subs) @thread.schedule do @subscriptions[mod_id] = subs subs.each do |sub| if sub.sys_id @systems[sub.sys_id] ||= Set.new @systems[sub.sys_id] << sub end end end end
unsubscribe(sub)
click to toggle source
Removes subscription callback from the lookup
# File lib/orchestrator/status.rb, line 75 def unsubscribe(sub) if sub.is_a? ::Libuv::Q::Promise sub.then @find_subscription else find_subscription(sub) end end
update(mod_id, status, value)
click to toggle source
Triggers an update to be sent to listening callbacks
# File lib/orchestrator/status.rb, line 84 def update(mod_id, status, value) mod = @subscriptions[mod_id] if mod subscribed = mod[status] if subscribed subscribed.each do |subscription| begin subscription.notify(value) rescue => e @controller.log_unhandled_exception(e) end end end end end
Protected Instance Methods
do_subscribe(opt)
click to toggle source
# File lib/orchestrator/status.rb, line 212 def do_subscribe(opt) # Build the subscription object (as loosely coupled as we can) sub = Subscription.new(opt[:sys_name], opt[:sys_id], opt[:mod_name], opt[:mod_id], opt[:index], opt[:status], opt[:callback], opt[:on_thread]) if sub.sys_id @systems[sub.sys_id] ||= Set.new @systems[sub.sys_id].add(sub) end # Now if the module is added later we'll still receive updates # and also support direct module status bindings if sub.mod_id @subscriptions[sub.mod_id] ||= {} @subscriptions[sub.mod_id][sub.status] ||= Set.new @subscriptions[sub.mod_id][sub.status].add(sub) # Check for existing status to send to subscriber value = opt[:mod].status[sub.status] sub.notify(value) unless value.nil? end # return the subscription sub end
find_subscription(sub)
click to toggle source
# File lib/orchestrator/status.rb, line 237 def find_subscription(sub) # Find module thread if sub.mod_id manager = @controller.loaded?(sub.mod_id) if manager thread = manager.thread thread.schedule do thread.observer.exec_unsubscribe(sub) end else # Should be in our schedule exec_unsubscribe(sub) end else exec_unsubscribe(sub) end end