class Orchestrator::Device::Processor

Constants

CONFIG_DEFAULTS
DUMMY_RESOLVER
FAILURE
SEND_DEFAULTS

Any command that waits: send(‘power on?’).then( execute after command complete ) Named commands mean the promise may never resolve Totally replaces emit as we don’t care and makes cross module request super easy – non-wait commands resolve after they have been written to the socket!!

SUCCESS
TERMINATE_MSG
UNNAMED

Attributes

config[R]
last_receive_at[R]

For statistics only

last_sent_at[R]

For statistics only

queue[R]
thread[R]
timeout[R]

For statistics only

transport[RW]

Public Class Methods

new(man) click to toggle source

init -> mod.load -> post_init So config can be set in on_load if desired

# File lib/orchestrator/device/processor.rb, line 75
def initialize(man)
    @man = man

    @thread = @man.thread
    @logger = @man.logger
    @defaults = SEND_DEFAULTS.dup
    @config = CONFIG_DEFAULTS.dup

    @queue = CommandQueue.new(@thread, method(:send_next))
    @responses = []
    @wait = false
    @connected = false
    @checking = Mutex.new
    @bonus = 0

    @last_sent_at = 0
    @last_receive_at = 0


    # Used to indicate when we can start the next response processing
    @head = ::Libuv::Q::ResolvedPromise.new(@thread, true)
    @tail = ::Libuv::Q::ResolvedPromise.new(@thread, true)

    # Method variables
    @resolver = proc { |resp| @thread.schedule { resolve_callback(resp) } }

    @resp_success = proc { |result| @thread.next_tick { resp_success(result) } }
    @resp_failure = proc { |reason| @thread.next_tick { resp_failure(reason) } }
end

Public Instance Methods

buffer(data) click to toggle source
# File lib/orchestrator/device/processor.rb, line 173
def buffer(data)
    @last_receive_at = @thread.now

    if @buffer
        @responses.concat @buffer.extract(data)
    else
        # tokenizing buffer above will enforce encoding
        if @config[:encoding]
            data.force_encoding(@config[:encoding])
        end
        @responses << data
    end

    # if we are waiting we don't want to process this data just yet
    if !@wait
        check_next
    end
end
check_next() click to toggle source
# File lib/orchestrator/device/processor.rb, line 196
def check_next
    return if @checking.locked? || @responses.length <= 0
    @checking.synchronize {
        loop do
            check_data(@responses.shift)
            break if @wait || @responses.length == 0
        end
    }
end
config=(options) click to toggle source
# File lib/orchestrator/device/processor.rb, line 111
def config=(options)
    @config.merge!(options)
    # use tokenize to signal a buffer update
    new_buffer if options.include?(:tokenize)
end
connected() click to toggle source

Callbacks ————————-

# File lib/orchestrator/device/processor.rb, line 148
def connected
    @connected = true
    new_buffer
    @man.notify_connected
    if @config[:update_status]
        @man.trak(:connected, true)
    end
end
disconnected() click to toggle source
# File lib/orchestrator/device/processor.rb, line 157
def disconnected
    @connected = false
    @man.notify_disconnected
    if @config[:update_status]
        @man.trak(:connected, false)
    end
    if @buffer && @config[:flush_buffer_on_disconnect]
        check_data(@buffer.flush)
    end
    @buffer = nil

    if @queue.waiting
        resp_failure(:disconnected)
    end
end
queue_command(options) click to toggle source

Public interface

# File lib/orchestrator/device/processor.rb, line 119
def queue_command(options)
    # Make sure we are sending appropriately formatted data
    raw = options[:data]

    if raw.is_a?(Array)
        options[:data] = array_to_str(raw)
    elsif options[:hex_string] == true
        options[:data] = hex_to_byte(raw)
    end

    data = options[:data]
    options[:retries] = 0 if options[:wait] == false

    if options[:name].is_a? String
        options[:name] = options[:name].to_sym
    end

    # merge in the defaults
    options = @defaults.merge(options)

    @queue.push(options, options[:priority] + @bonus)

rescue => e
    options[:defer].reject(e)
    @logger.print_error(e, 'error queuing command')
end
send_options(options) click to toggle source

Helper functions ——————

# File lib/orchestrator/device/processor.rb, line 107
def send_options(options)
    @defaults.merge!(options)
end
terminate() click to toggle source
# File lib/orchestrator/device/processor.rb, line 192
def terminate
    @thread.schedule method(:do_terminate)
end

Protected Instance Methods

call_emit(cmd) click to toggle source

If a callback was in place for the current

# File lib/orchestrator/device/processor.rb, line 356
def call_emit(cmd)
    callback = cmd[:emit]
    if callback
        @thread.next_tick do
            begin
                callback.call
            rescue => e
                @logger.print_error(e, 'error in emit callback')
            end
        end
    end
end
check_data(data) click to toggle source

Check transport response data

# File lib/orchestrator/device/processor.rb, line 232
def check_data(data)
    resp = nil

    # Provide commands with a bonus in this section
    @bonus = @config[:priority_bonus]

    begin
        cmd = @queue.waiting
        if cmd
            @wait = true
            @defer = @thread.defer
            @defer.promise.then @resp_success, @resp_failure

            # Disconnect before processing the response
            transport.disconnect if cmd[:force_disconnect]

            # Send response, early resolver and command
            resp = @man.notify_received(data, @resolver, cmd)
        else
            resp = @man.notify_received(data, DUMMY_RESOLVER, nil)
            # Don't need to trigger Queue next here as we are not waiting on anything
        end
    rescue => e
        # NOTE:: This error should never be called
        @logger.print_error(e, 'error processing response data')
        @defer.reject :abort if @defer
    ensure
        @bonus = 0
    end

    # Check if response is a success or failure
    resolve_callback(resp) unless resp == :async
end
clear_timers() click to toggle source
# File lib/orchestrator/device/processor.rb, line 438
def clear_timers
    @timeout.cancel if @timeout
    @timeout = nil
end
do_terminate() click to toggle source
# File lib/orchestrator/device/processor.rb, line 224
def do_terminate
    if @queue.waiting
        @queue.waiting[:defer].reject(TERMINATE_MSG)
    end
    @queue.cancel_all(TERMINATE_MSG)
end
new_buffer() click to toggle source
# File lib/orchestrator/device/processor.rb, line 210
def new_buffer
    tokenize = @config[:tokenize]
    if tokenize
        if tokenize.respond_to? :call
            @buffer = tokenize.call
        else
            @buffer = ::UV::BufferedTokenizer.new(@config)
        end
    elsif @buffer
        # remove the buffer if none
        @buffer = nil
    end
end
process_send(command) click to toggle source
# File lib/orchestrator/device/processor.rb, line 393
def process_send(command)
    # delay on receive
    if command[:delay_on_receive] > 0
        gap = @last_receive_at + command[:delay_on_receive] - @thread.now

        if gap > 0
            defer = @thread.defer
            
            sched = schedule.in(gap) do
                defer.resolve(process_send(command))
            end
            # in case of shutdown we need to resolve this promise
            sched.catch do
                defer.reject(:shutdown)
            end
            defer.promise
        else
            transport_send(command)
        end
    else
        transport_send(command)
    end
end
resolve_callback(resp) click to toggle source
# File lib/orchestrator/device/processor.rb, line 266
def resolve_callback(resp)
    if @defer
        if FAILURE.include? resp
            @defer.reject resp
        else
            @defer.resolve resp
        end
        @defer = nil
    end
end
resp_failure(result_raw) click to toggle source
# File lib/orchestrator/device/processor.rb, line 277
def resp_failure(result_raw)
    if @queue.waiting
        begin
            result = result_raw.is_a?(Fixnum) ? :timeout : result_raw
            cmd = @queue.waiting
            debug = "with #{result}: <#{cmd[:name] || UNNAMED}> "
            if cmd[:data]
                debug << "#{cmd[:data].inspect}" 
            else
                debug << cmd[:path]
            end
            @logger.debug "command failed #{debug}"

            if cmd[:retries] == 0
                err = Error::CommandFailure.new "command aborted #{debug}"
                cmd[:defer].reject(err)
                @logger.warn err.message
            else
                cmd[:retries] -= 1
                cmd[:wait_count] = 0      # reset our ignore count
                @queue.push(cmd, cmd[:priority] + @config[:priority_bonus])
            end
        rescue => e
            # Prevent the queue from ever pausing - this should never be called
            @logger.print_error(e, 'error handling request failure')
        end
    end

    clear_timers

    @wait = false
    @queue.waiting = nil
    check_next                    # Process already received
    @queue.shift if @connected    # Then send a new command
end
resp_success(result) click to toggle source

We only care about queued commands here Promises resolve on the next tick so processing

is guaranteed to have completed

Check for queue wait as we may have gone offline

# File lib/orchestrator/device/processor.rb, line 317
def resp_success(result)
    if @queue.waiting && result && result != :ignore
        if result == :abort
            cmd = @queue.waiting
            err = Error::CommandFailure.new "module aborted command with #{result}: <#{cmd[:name] || UNNAMED}> #{(cmd[:data] || cmd[:path]).inspect}"
            @queue.waiting[:defer].reject(err)
        else
            @queue.waiting[:defer].resolve(result)
            call_emit @queue.waiting
        end

        clear_timers

        @wait = false
        @queue.waiting = nil
        check_next      # Process pending
        @queue.shift    # Send the next command

        # Else it must have been a nil or :ignore
    elsif @queue.waiting
        cmd = @queue.waiting
        cmd[:wait_count] ||= 0
        cmd[:wait_count] += 1
        if cmd[:wait_count] > cmd[:max_waits]
            resp_failure(:max_waits_exceeded)
        else
            check_next
        end

    else  # ensure consistent state (offline event may have occurred)

        clear_timers

        @wait = false
        check_next
    end
end
schedule() click to toggle source
# File lib/orchestrator/device/processor.rb, line 443
def schedule
    @schedule ||= @man.get_scheduler
end
send_next(command) click to toggle source

Callback for queued commands

# File lib/orchestrator/device/processor.rb, line 371
def send_next(command)
    # Check for any required delays between sends
    if command[:delay] > 0
        gap = @last_sent_at + command[:delay] - @thread.now
        if gap > 0
            defer = @thread.defer
            sched = schedule.in(gap) do
                defer.resolve(process_send(command))
            end
            # in case of shutdown we need to resolve this promise
            sched.catch do
                defer.reject(:shutdown)
            end
            defer.promise
        else
            process_send(command)
        end
    else
        process_send(command)
    end
end
transport_send(command) click to toggle source
# File lib/orchestrator/device/processor.rb, line 417
def transport_send(command)
    @transport.transmit(command)
    @last_sent_at = @thread.now

    if @queue.waiting
        # Set up timers for command timeout
        @timeout = schedule.in(command[:timeout], @resp_failure)
    else
        # resole the send promise early as we are not waiting for the response
        command[:defer].resolve(:no_wait)
        call_emit command   # the command has been sent
    end

    # Useful for emergency stops etc
    if command[:clear_queue]
        @queue.cancel_all("Command #{command[:name]} cleared the queue")
    end

    nil # ensure promise chain is not propagated
end