class Orchestrator::Device::CommandQueue

Constants

OFFLINE_MSG

Attributes

pause[R]
state[R]
waiting[RW]

Public Class Methods

new(loop, callback) click to toggle source

init -> mod.load -> post_init So config can be set in on_load if desired

# File lib/orchestrator/device/command_queue.rb, line 26
def initialize(loop, callback)
    @loop = loop
    @callback = callback

    @named_commands = {
        # name: [[priority list], command]
        # where command may be nil
    }
    @comparison = method(:comparison)
    @pending_commands = Containers::Heap.new(&@comparison)

    @waiting = nil      # Last command sent that was marked as waiting
    @pause = 0
    @state = :online    # online / offline
    @pause_shift = method(:pause_shift)
    @move_forward = method(:move_forward)
end

Public Instance Methods

cancel_all(msg) click to toggle source
# File lib/orchestrator/device/command_queue.rb, line 162
def cancel_all(msg)
    while length > 0
        cmd = @pending_commands.pop
        if cmd.is_a? Symbol
            res = @named_commands[cmd]
            if res
                res[1][:defer].reject(msg)
                @named_commands.delete(cmd)
            end
        else
            cmd[:defer].reject(msg)
        end
    end
end
length() click to toggle source
# File lib/orchestrator/device/command_queue.rb, line 116
def length
    @pending_commands.size
end
offline(clear = false) click to toggle source
# File lib/orchestrator/device/command_queue.rb, line 130
def offline(clear = false)
    @state = :offline

    if clear
        @waiting[:defer].reject(OFFLINE_MSG) if @waiting
        cancel_all(OFFLINE_MSG)
        @waiting = nil
    else
        # Keep named commands
        new_queue = Containers::Heap.new(&@comparison)

        while length > 0
            cmd = @pending_commands.pop
            if cmd.is_a? Symbol
                res = @named_commands[cmd][0]
                pri = res.shift
                res << pri
                queue_push(new_queue, cmd, pri)
            else
                cmd[:defer].reject(OFFLINE_MSG)
            end
        end
        @pending_commands = new_queue
        
        # clear waiting if it is not a named command.
        # The processor will re-queue it if retry on disconnect is set
        if @waiting && @waiting[:name].nil?
            @waiting = nil
        end
    end
end
online() click to toggle source

If offline we’ll only maintain named command state and queue

# File lib/orchestrator/device/command_queue.rb, line 122
def online
    @state = :online

    # next tick is important as it allows the module time to updated
    # any named commands that it desires in the connected callback
    shift_next_tick
end
push(command, priority) click to toggle source
# File lib/orchestrator/device/command_queue.rb, line 84
def push(command, priority)
    if @state == :offline && command[:name].nil?
        return
    end

    if command[:name]
        name = command[:name].to_sym

        current = @named_commands[name] ||= [[], nil]

        # Chain the promises if the named command is already in the queue
        cmd = current[1]
        cmd[:defer].resolve(command[:defer].promise) if cmd

        
        current[1] = command   # replace the old command
        priors = current[0]

        # Only add commands of higher priority to the queue
        if priors.empty? || priors[-1] < priority
            priors << priority
            queue_push(@pending_commands, name, priority)
        end
    else
        queue_push(@pending_commands, command, priority)
    end

    if @waiting.nil? && @state == :online
        shift  # This will trigger the callback
    end
end
shift() click to toggle source
# File lib/orchestrator/device/command_queue.rb, line 49
def shift
    return if @pause > 0 # we are waiting for the next_tick?

    @waiting = nil  # Discard the current command
    if length > 0
        next_cmd = @pending_commands.pop

        if next_cmd.is_a? Symbol # (named command)
            result = @named_commands[next_cmd]
            result[0].shift
            cmd = result[1]
            if cmd.nil?
                shift_next_tick if length > 0
                return  # command already executed, this is a no-op
            else
                result[1] = nil
            end
        else
            cmd = next_cmd
        end

        @waiting = cmd if cmd[:wait]
        shift_promise = @callback.call cmd

        if shift_promise.is_a? ::Libuv::Q::Promise
            @pause += 1
            shift_promise.finally do # NOTE:: This schedule may not be required...
                @loop.schedule @move_forward
            end
        else
            shift_next_tick if length > 0
        end
    end
end
shift_next_tick() click to toggle source
# File lib/orchestrator/device/command_queue.rb, line 44
def shift_next_tick
    @pause += 1
    @loop.next_tick @pause_shift
end

Protected Instance Methods

comparison(x, y) click to toggle source

Queue related methods

# File lib/orchestrator/device/command_queue.rb, line 199
def comparison(x, y)
    if x[0] == y[0]
        x[1] < y[1]
    else
        (x[0] <=> y[0]) == 1
    end
end
move_forward() click to toggle source
# File lib/orchestrator/device/command_queue.rb, line 190
def move_forward
    @pause -= 1
    if !@waiting && length > 0
        shift
    end
end
pause_shift() click to toggle source

If we next_tick a shift then a push may be able to sneak in before that command is shifted. If the new push is a waiting command then the next tick shift will discard it which is undesirable

# File lib/orchestrator/device/command_queue.rb, line 185
def pause_shift
    @pause -= 1
    shift
end
queue_push(queue, obj, pri) click to toggle source
# File lib/orchestrator/device/command_queue.rb, line 207
def queue_push(queue, obj, pri)
    pri = [pri, Time.now.to_f]
    queue.push(pri, obj)
end