module SDN::CLI::MQTT::Write

Public Instance Methods

write() click to toggle source
# File lib/sdn/cli/mqtt/write.rb, line 5
def write
  last_write_at = Process.clock_gettime(Process::CLOCK_MONOTONIC)

  loop do
    message_and_retries = nil
    @mutex.synchronize do
      # got woken up early by another command getting queued; spin
      if @response_pending
        while @response_pending
          remaining_wait = @response_pending - Time.now.to_f
          if remaining_wait < 0
            SDN.logger.debug "timed out waiting on response"
            @response_pending = nil
            @broadcast_pending = nil
            if @prior_message && @prior_message&.remaining_retries != 0
              SDN.logger.debug "retrying #{@prior_message.remaining_retries} more times ..."
              if Message.is_group_address?(@prior_message.message.src) && !@pending_group_motors.empty?
                SDN.logger.debug "re-targetting group message to individual motors"
                @pending_group_motors.each do |addr|
                  new_message = @prior_message.message.dup
                  new_message.src = [0, 0, 1]
                  new_message.dest = Message.parse_address(addr)
                  @queues[@prior_message.priority].push(MessageAndRetries.new(new_message, @prior_message.remaining_retries, @prior_message.priority))
                end
                @pending_group_motors = []
              else
                @queues[@prior_message.priority].push(@prior_message)
              end
              @prior_message = nil
            end
          else
            @cond.wait(@mutex, remaining_wait)
          end
        end
      end

      @queues.find { |q| message_and_retries = q.shift }
      if message_and_retries
        if message_and_retries.message.ack_requested || message_and_retries.message.class.name =~ /^SDN::Message::Get/
          @response_pending = Time.now.to_f + WAIT_TIME
          @pending_group_motors = if Message.is_group_address?(message_and_retries.message.src)
            group_addr = Message.print_address(message_and_retries.message.src).gsub('.', '')
            @groups[group_addr]&.motor_objects&.map(&:addr) || []
          else
            []
          end
            
          if message_and_retries.message.dest == BROADCAST_ADDRESS || Message.is_group_address?(message_and_retries.message.src) && message_and_retries.message.is_a?(Message::GetNodeAddr)
            @broadcast_pending = Time.now.to_f + BROADCAST_WAIT
          end
        end
      end

      # wait until there is a message
      if @response_pending
        message_and_retries.remaining_retries -= 1
        @prior_message = message_and_retries  
      elsif message_and_retries
        @prior_message = nil  
      else
        if @auto_discover && @motors_found
          # nothing pending to write, and motors found on the last iteration;
          # look for more motors
          message_and_retries = MessageAndRetries.new(Message::GetNodeAddr.new, 1, 2)
          @motors_found = false
        else
          @cond.wait(@mutex)
        end
      end
    end
    next unless message_and_retries

    message = message_and_retries.message
    SDN.logger.info "writing #{message.inspect}"
    # minimum time between messages
    now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
    sleep_time = 0.1 - (now - last_write_at)
    sleep(sleep_time) if sleep_time > 0
    @sdn.send(message)
    last_write_at = now
  end
rescue => e
  SDN.logger.fatal "failure writing: #{e}: #{e.backtrace}"
  exit 1
end