class Orchestrator::WebsocketManager

Constants

COMMANDS
DECODE_OPTIONS
ERRORS
PARAMS
REQUIRED

Public Class Methods

new(ws, user) click to toggle source
# File lib/orchestrator/websocket_manager.rb, line 7
def initialize(ws, user)
    @ws = ws
    @user = user
    @loop = ws.loop

    @bindings = ::ThreadSafe::Cache.new
    @stattrak = @loop.observer
    @notify_update = method(:notify_update)

    @logger = ::Orchestrator::Logger.new(@loop, user)

    @ws.progress method(:on_message)
    @ws.finally method(:on_shutdown)
    #@ws.on_open method(:on_open)

    @accessed = ::Set.new
    @access_log = ::Orchestrator::AccessLog.new
    @access_log.user_id = @user.id
end

Protected Instance Methods

bind(params) click to toggle source
# File lib/orchestrator/websocket_manager.rb, line 199
def bind(params)
    id = params[:id]
    sys = params[:sys]
    mod = params[:mod].to_sym
    name = params[:name].to_sym
    index_s = params[:index] || 1
    index = index_s.to_i

    # perform binding on the thread pool
    @loop.work(proc {
        check_binding(id, sys, mod, index, name)
    }).catch do |err|
        @logger.print_error(err, "websocket request failed: #{params}")
        error_response(id, ERRORS[:unexpected_failure], err.message)
    end
end
check_binding(id, sys, mod, index, name) click to toggle source

Called from a worker thread

# File lib/orchestrator/websocket_manager.rb, line 217
def check_binding(id, sys, mod, index, name)
    system = ::Orchestrator::System.get(sys)

    if system
        lookup = :"#{sys}_#{mod}_#{index}_#{name}"
        binding = @bindings[lookup]

        if binding.nil?
            try_bind(id, sys, system, mod, index, name, lookup)
        else
            # binding already made - return success
            @ws.text(::JSON.generate({
                id: id,
                type: :success,
                meta: {
                    sys: sys,
                    mod: mod,
                    index: index,
                    name: name
                }
            }))
        end
    else
        @logger.debug("websocket binding could not find system: {sys: #{sys}, mod: #{mod}, index: #{index}, name: #{name}}")
        error_response(id, ERRORS[:system_not_found], "could not find system: #{sys}")
    end
end
check_requirements(params) click to toggle source
# File lib/orchestrator/websocket_manager.rb, line 115
def check_requirements(params)
    REQUIRED.each do |key|
        return false unless params.has_key?(key)
    end
    true
end
debug(params) click to toggle source
# File lib/orchestrator/websocket_manager.rb, line 313
def debug(params)
    id = params[:id]
    sys = params[:sys]
    mod = params[:mod].to_sym
    index_s = params[:index]
    index = nil
    index = index_s.to_i if index_s

    if @debug.nil?
        @debug = @loop.defer
        @inspecting = Set.new # modules
        @debug.promise.progress method(:debug_update)
    end

    if index
        # Look up the module ID on the thread pool
        @loop.work(proc {
            system = ::Orchestrator::System.get(sys)
            if system
                mod_man = system.get(mod, index - 1)
                if mod_man
                    mod_man.settings.id
                else
                    ::Libuv::Q.reject(@loop, 'debug failed: module #{sys}->#{mod}_#{index} not found')
                end
            else
                ::Libuv::Q.reject(@loop, 'debug failed: system #{sys} lookup failed')
            end
        }).then(proc { |mod_id|
            do_debug(id, mod_id, sys, mod, index)
        }).catch do |err|
            if err.is_a? String
                @logger.info(err)
                error_response(id, ERRORS[:module_not_found], err)
            else
                @logger.print_error(err, "debug request failed: #{params}")
                error_response(id, ERRORS[:module_not_found], "debug request failed for: #{sys}->#{mod}_#{index}")
            end
        end
    else
        do_debug(id, mod)
    end
end
debug_update(klass, id, level, msg) click to toggle source
# File lib/orchestrator/websocket_manager.rb, line 406
def debug_update(klass, id, level, msg)
    @ws.text(::JSON.generate({
        type: :debug,
        mod: id,
        klass: klass,
        level: level,
        msg: msg
    }))
end
do_debug(id, mod, sys_id = nil, mod_name = nil, mod_index = nil) click to toggle source
# File lib/orchestrator/websocket_manager.rb, line 357
def do_debug(id, mod, sys_id = nil, mod_name = nil, mod_index = nil)
    resp = {
        id: id,
        type: :success,
        mod_id: mod
    }

    if mod_name
        # Provide meta information for convenience
        # Actual debug messages do not contain this info
        # The library must use the mod_id returned in the response to route responses as desired
        resp[:meta] = {
            sys: sys_id,
            mod: mod_name,
            index: mod_index
        }
    end

    # Set mod to get module level errors
    begin
        if @inspecting.include?(mod)
            @ws.text(::JSON.generate(resp))
        else
            mod_man = ::Orchestrator::Control.instance.loaded?(mod)
            if mod_man
                log = mod_man.logger
                log.add @debug
                log.level = :debug
                @inspecting.add mod

                # Set sys to get errors occurring outside of the modules
                if !@inspecting.include?(:self)
                    @logger.add @debug
                    @logger.level = :debug
                    @inspecting.add :self
                end

                @ws.text(::JSON.generate(resp))
            else
                @logger.info("websocket debug could not find module: #{mod}")
                error_response(id, ERRORS[:module_not_found], "could not find module: #{mod}")
            end
        end
    rescue => e
        @logger.print_error(e, "websocket debug request failed")
        error_response(id, ERRORS[:request_failed], e.message)
    end
end
do_exec(id, sys, mod, index, name, args) click to toggle source
# File lib/orchestrator/websocket_manager.rb, line 138
def do_exec(id, sys, mod, index, name, args)
    system = ::Orchestrator::System.get(sys)

    if system
        mod_man = system.get(mod, index - 1)
        if mod_man
            req = Core::RequestProxy.new(@loop, mod_man, @user)
            result = req.send(name, *args)
            result.then(proc { |res|
                output = nil
                begin
                    ::JSON.generate([res])
                    output = res
                rescue => e
                    # respond with nil if object cannot be converted
                    # TODO:: need a better way of dealing with this
                    # ALSO in systems controller
                end
                @ws.text(::JSON.generate({
                    id: id,
                    type: :success,
                    value: output
                }))
            }, proc { |err|
                # Request proxy will log the error
                error_response(id, ERRORS[:request_failed], err.message)
            })
        else
            @logger.debug("websocket exec could not find module: {sys: #{sys}, mod: #{mod}, index: #{index}, name: #{name}}")
            error_response(id, ERRORS[:module_not_found], "could not find module: #{mod}")
        end
    else
        @logger.debug("websocket exec could not find system: {sys: #{sys}, mod: #{mod}, index: #{index}, name: #{name}}")
        error_response(id, ERRORS[:system_not_found], "could not find system: #{sys}")
    end
end
do_unbind(binding) click to toggle source
# File lib/orchestrator/websocket_manager.rb, line 194
def do_unbind(binding)
    @stattrak.unsubscribe(binding)
end
error_response(id, code, message) click to toggle source
# File lib/orchestrator/websocket_manager.rb, line 459
def error_response(id, code, message)
    @ws.text(::JSON.generate({
        id: id,
        type: :error,
        code: code,
        msg: message
    }))
end
exec(params) click to toggle source
# File lib/orchestrator/websocket_manager.rb, line 123
def exec(params)
    id = params[:id]
    sys = params[:sys]
    mod = params[:mod].to_sym
    name = params[:name].to_sym
    index_s = params[:index] || 1
    index = index_s.to_i

    args = params[:args] || []

    @loop.work do
        do_exec(id, sys, mod, index, name, args)
    end
end
ignore(params) click to toggle source
# File lib/orchestrator/websocket_manager.rb, line 417
def ignore(params)
    id = params[:id]
    sys = params[:sys]
    mod_s = params[:mod]
    mod = mod_s.to_sym if mod_s

    if @debug.nil?
        @debug = @loop.defer
        @inspecting = Set.new # modules
        @debug.promise.progress method(:debug_update)
    end

    # Remove module level errors
    if mod && @inspecting.include?(mod)
        mod_man = ::Orchestrator::Control.instance.loaded?(mod)
        if mod_man
            mod_man.logger.delete @debug
            @inspecting.delete mod

            # Stop logging all together if no more modules being watched
            if @inspecting.empty?
                @logger.delete @debug
                @inspecting.delete :self
            end

            @ws.text(::JSON.generate({
                id: id,
                type: :success
            }))
        else
            @logger.info("websocket ignore could not find module: #{mod}")
            error_response(id, ERRORS[:module_not_found], "could not find module: #{mod}")
        end
    else
        @ws.text(::JSON.generate({
            id: id,
            type: :success
        }))
    end
end
notify_update(update) click to toggle source
# File lib/orchestrator/websocket_manager.rb, line 291
def notify_update(update)
    output = nil
    begin
        ::JSON.generate([update.value])
        output = update.value
    rescue => e
        # respond with nil if object cannot be converted
        # TODO:: need a better way of dealing with this
    end
    @ws.text(::JSON.generate({
        type: :notify,
        value: output,
        meta: {
            sys: update.sys_id,
            mod: update.mod_name,
            index: update.index,
            name: update.status
        }
    }))
end
on_message(data, ws) click to toggle source
# File lib/orchestrator/websocket_manager.rb, line 52
def on_message(data, ws)
    if data == 'ping'
        @ws.text('pong')
        return
    end

    begin
        raw_parameters = ::JSON.parse(data, DECODE_OPTIONS)
        parameters = ::ActionController::Parameters.new(raw_parameters)
        params = parameters.permit(PARAMS).tap do |whitelist|
            whitelist[:args] = parameters[:args]
        end
    rescue => e
        @logger.print_error(e, 'error parsing websocket request')
        error_response(nil, ERRORS[:parse_error], e.message)
        return
    end

    if check_requirements(params)
        # Perform the security check in a nonblocking fashion
        # (Database access is probably required)
        result = @loop.work do
            sys = params[:sys]
            params[:sys] = ::Orchestrator::ControlSystem.bucket.get("sysname-#{sys.downcase}", {quiet: true}) || sys
            Rails.configuration.orchestrator.check_access.call(params[:sys], @user)
        end

        # The result should be an access level if these are implemented
        result.then do |access|
            begin
                cmd = params[:cmd].to_sym
                if COMMANDS.include?(cmd)
                    @accessed << params[:sys]   # Log the access
                    self.__send__(cmd, params)  # Execute the request

                    # Start logging
                    periodicly_update_logs if @accessTimer.nil?
                else
                    @access_log.suspected = true
                    @logger.warn("websocket requested unknown command '#{params[:cmd]}'")
                    error_response(params[:id], ERRORS[:unknown_command], "unknown command: #{params[:cmd]}")
                end
            rescue => e
                @logger.print_error(e, "websocket request failed: #{data}")
                error_response(params[:id], ERRORS[:request_failed], e.message)
            end
        end

        # Raise an error if access is not granted
        result.catch do |e|
            @access_log.suspected = true
            @logger.print_error(e, 'security check failed for websocket request')
            error_response(params[:id], ERRORS[:access_denied], e.message)
        end
    else
        # log user information here (possible probing attempt)
        @access_log.suspected = true
        reason = 'required parameters were missing from the request'
        @logger.warn(reason)
        error_response(params[:id], ERRORS[:bad_request], reason)
    end
end
on_shutdown() click to toggle source
# File lib/orchestrator/websocket_manager.rb, line 468
def on_shutdown
    @bindings.each_value &method(:do_unbind)
    @bindings = nil
    @debug.resolve(true) if @debug # detach debug listeners

    @accessTimer.cancel
    @loop.work(proc {
        @accesslock.synchronize {
            @access_log.systems = @accessed.to_a
            @access_log.ended_at = Time.now.to_i
            @access_log.save
        }
    })
end
periodicly_update_logs() click to toggle source
# File lib/orchestrator/websocket_manager.rb, line 504
def periodicly_update_logs
    @accessTimer = @loop.scheduler.every(60000 + Random.rand(1000), method(:update_accessed))
    @accesslock = Mutex.new
    @access_log.systems = @accessed.to_a
    @loop.work(proc {
        @accesslock.synchronize {
            @access_log.save
        }
    })
end
try_bind(id, sys, system, mod_name, index, name, lookup) click to toggle source
# File lib/orchestrator/websocket_manager.rb, line 245
def try_bind(id, sys, system, mod_name, index, name, lookup)
    options = {
        sys_id: sys,
        sys_name: system.config.name,
        mod_name: mod_name,
        index: index,
        status: name,
        callback: @notify_update,
        on_thread: @loop
    }

    # if the module exists, subscribe on the correct thread
    # use a bit of promise magic as required
    mod_man = system.get(mod_name, index - 1)
    defer = @loop.defer

    # Ensure browser sees this before the first status update
    # At this point subscription will be successful
    @bindings[lookup] = defer.promise
    @ws.text(::JSON.generate({
        id: id,
        type: :success,
        meta: {
            sys: sys,
            mod: mod_name,
            index: index,
            name: name
        }
    }))

    if mod_man
        options[:mod_id] = mod_man.settings.id.to_sym
        options[:mod] = mod_man
        thread = mod_man.thread
        thread.schedule do
            defer.resolve (
                thread.observer.subscribe(options)
            )
        end
    else
        @loop.schedule do
            defer.resolve @stattrak.subscribe(options)
        end
    end
end
unbind(params) click to toggle source
# File lib/orchestrator/websocket_manager.rb, line 176
def unbind(params)
    id = params[:id]
    sys = params[:sys]
    mod = params[:mod]
    name = params[:name]
    index_s = params[:index] || 1
    index = index_s.to_i

    lookup = :"#{sys}_#{mod}_#{index}_#{name}"
    binding = @bindings.delete(lookup)
    do_unbind(binding) if binding

    @ws.text(::JSON.generate({
        id: id,
        type: :success
    }))
end
update_accessed(*args) click to toggle source
# File lib/orchestrator/websocket_manager.rb, line 487
def update_accessed(*args)
    if @accesslock.try_lock    # No blocking!
        begin
            @access_log.systems = @accessed.to_a

            @loop.work(proc {
                @access_log.save
            }).finally do
                @accesslock.unlock
            end
        rescue => e
            @accesslock.unlock if @accesslock.locked?
            @logger.print_error(e, "unknown error writing access log")
        end
    end
end