class Orchestrator::Status

Attributes

thread[R]

Public Class Methods

new(thread) click to toggle source
# File lib/orchestrator/status.rb, line 34
def initialize(thread)
    @thread = thread
    @controller = ::Orchestrator::Control.instance

    @find_subscription = method(:find_subscription)

    # {:mod_id => {status => Subscriptions}}
    @subscriptions = {}
    # {:system_id => Subscriptions}
    @systems = {}
end

Public Instance Methods

exec_unsubscribe(sub) click to toggle source
NOTE

Only to be called from subscription thread

# File lib/orchestrator/status.rb, line 177
def exec_unsubscribe(sub)
    # Update the system lookup if a system was specified
    if sub.sys_id
        subscriptions = @systems[sub.sys_id]
        if subscriptions
            subscriptions.delete(sub)

            if subscriptions.empty?
                @systems.delete(sub.sys_id)
            end
        end
    end

    # Update the module lookup
    statuses = @subscriptions[sub.mod_id]
    if statuses
        subscriptions = statuses[sub.status]
        if subscriptions
            subscriptions.delete(sub)

            if subscriptions.empty?
                statuses.delete(sub.status)

                if statuses.empty?
                    @subscriptions.delete(sub.mod_id)
                end
            end
        end
    end
end
move(mod_id, to_thread) click to toggle source

Used to maintain subscriptions where module is moved to another thread or even another server.

# File lib/orchestrator/status.rb, line 102
def move(mod_id, to_thread)
    return if to_thread == self

    @thread.schedule do
        subs = @subscriptions.delete(mod_id)
        if subs
            # Remove the system references
            subs.each do |sub|
                @systems[sub.sys_id].delete(sub) if sub.sys_id
            end

            # Transfer the subscriptions
            to_thread.transfer(mod_id, subs)
        end
    end
end
reloaded_system(sys_id, sys) click to toggle source

The System class contacts each of the threads to let them know of an update

# File lib/orchestrator/status.rb, line 133
def reloaded_system(sys_id, sys)
    subscriptions = @systems[sys_id]
    if subscriptions
        subscriptions.each do |sub|
            old_id = sub.mod_id

            # re-index the subscription
            mod = sys.get(sub.mod_name, sub.index - 1)
            sub.mod_id = mod ? mod.settings.id.to_sym : nil

            # Check for changes (order, removal, replacement)
            if old_id != sub.mod_id
                @subscriptions[old_id][sub.status].delete(sub)

                # Update to the new module
                if sub.mod_id
                    @subscriptions[sub.mod_id] ||= {}
                    @subscriptions[sub.mod_id][sub.status] ||= Set.new
                    @subscriptions[sub.mod_id][sub.status].add(sub)

                    # Check for existing status to send to subscriber
                    value = mod.status[sub.status]
                    sub.notify(value) unless value.nil?
                end

                # Transfer the subscription if on a different thread
                if mod.thread != @thread
                    move(sub.mod_id.to_sym, mod.thread)
                end

                # Perform any required cleanup
                if @subscriptions[old_id][sub.status].empty?
                    @subscriptions[old_id].delete(sub.status)
                    if @subscriptions[old_id].empty?
                        @subscriptions.delete(old_id)
                    end
                end
            end
        end
    end
end
subscribe(opt) click to toggle source

Subscribes to updates from a system module Modules do not have to exist and updates will be triggered as soon as they are

# File lib/orchestrator/status.rb, line 52
def subscribe(opt)     # sys_name, mod_name, index, status, callback, on_thread
    if opt[:sys_name] && !opt[:sys_id]
        @thread.work(proc {
            id = ::Orchestrator::ControlSystem.bucket.get("sysname-#{sys_name}")
            opt[:sys_id] = id

            # Grabbing system here as thread-safe and has the potential to block
            ::Orchestrator::System.get(id)
        }).then(proc { |sys|
            mod = sys.get(opt[:mod_name], opt[:index] - 1)
            if mod
                opt[:mod_id] = mod.settings.id.to_sym
                opt[:mod] = mod
            end

            do_subscribe(opt)
        })
    else
        do_subscribe(opt)
    end
end
transfer(mod_id, subs) click to toggle source
# File lib/orchestrator/status.rb, line 119
def transfer(mod_id, subs)
    @thread.schedule do
        @subscriptions[mod_id] = subs

        subs.each do |sub|
            if sub.sys_id
                @systems[sub.sys_id] ||= Set.new
                @systems[sub.sys_id] << sub
            end
        end
    end
end
unsubscribe(sub) click to toggle source

Removes subscription callback from the lookup

# File lib/orchestrator/status.rb, line 75
def unsubscribe(sub)
    if sub.is_a? ::Libuv::Q::Promise 
        sub.then @find_subscription
    else
        find_subscription(sub)
    end
end
update(mod_id, status, value) click to toggle source

Triggers an update to be sent to listening callbacks

# File lib/orchestrator/status.rb, line 84
def update(mod_id, status, value)
    mod = @subscriptions[mod_id]
    if mod
        subscribed = mod[status]
        if subscribed
            subscribed.each do |subscription|
                begin
                    subscription.notify(value)
                rescue => e
                    @controller.log_unhandled_exception(e)
                end
            end
        end
    end
end

Protected Instance Methods

do_subscribe(opt) click to toggle source
# File lib/orchestrator/status.rb, line 212
def do_subscribe(opt)
    # Build the subscription object (as loosely coupled as we can)
    sub = Subscription.new(opt[:sys_name], opt[:sys_id], opt[:mod_name], opt[:mod_id], opt[:index], opt[:status], opt[:callback], opt[:on_thread])

    if sub.sys_id
        @systems[sub.sys_id] ||= Set.new
        @systems[sub.sys_id].add(sub)
    end

    # Now if the module is added later we'll still receive updates
    # and also support direct module status bindings
    if sub.mod_id
        @subscriptions[sub.mod_id] ||= {}
        @subscriptions[sub.mod_id][sub.status] ||= Set.new
        @subscriptions[sub.mod_id][sub.status].add(sub)

        # Check for existing status to send to subscriber
        value = opt[:mod].status[sub.status]
        sub.notify(value) unless value.nil?
    end

    # return the subscription
    sub
end
find_subscription(sub) click to toggle source
# File lib/orchestrator/status.rb, line 237
def find_subscription(sub)
    # Find module thread
    if sub.mod_id
        manager = @controller.loaded?(sub.mod_id)
        if manager
            thread = manager.thread
            thread.schedule do
                thread.observer.exec_unsubscribe(sub)
            end
        else
            # Should be in our schedule
            exec_unsubscribe(sub)
        end
    else
        exec_unsubscribe(sub)
    end
end