module SDN::CLI::MQTT::Read

Public Instance Methods

read() click to toggle source
# File lib/sdn/cli/mqtt/read.rb, line 5
def read
  loop do
    begin
      @sdn.receive do |message|
        @mqtt.batch_publish do
          SDN.logger.info "read #{message.inspect}"

          src = Message.print_address(message.src)
          # ignore the UAI Plus and ourselves
          if src != '7F.7F.7F' && !Message.is_group_address?(message.src) && !(motor = @motors[src.gsub('.', '')])
            SDN.logger.info "found new motor #{src}"
            @motors_found = true
            motor = publish_motor(src.gsub('.', ''), message.node_type)
          end

          follow_ups = []
          case message
          when Message::PostNodeLabel
            if (motor.publish(:label, message.label))
              publish("#{motor.addr}/$name", message.label)
            end
          when Message::PostMotorPosition,
            Message::ILT2::PostMotorPosition
            if message.is_a?(Message::ILT2::PostMotorPosition)
              # keep polling while it's still moving; check prior two positions
              if motor.position_pulses == message.position_pulses &&
                motor.last_position_pulses == message.position_pulses
                motor.publish(:state, :stopped)
              else
                motor.publish(:state, :running)
                if motor.position_pulses && motor.position_pulses != message.position_pulses
                  motor.publish(:last_direction, motor.position_pulses < message.position_pulses ? :down : :up)
                end
                follow_ups << Message::ILT2::GetMotorPosition.new(message.src)
              end
              motor.last_position_pulses = motor.position_pulses
              ip = (1..16).find do |i|
                # divide by 5 for some leniency
                motor["ip#{i}_pulses"].to_i / 5 == message.position_pulses / 5
              end
              motor.publish(:ip, ip)
            end
            motor.publish(:position_percent, message.position_percent)
            motor.publish(:position_pulses, message.position_pulses)
            motor.publish(:ip, message.ip) if message.respond_to?(:ip)
            motor.group_objects.each do |group|
              positions_percent = group.motor_objects.map(&:position_percent)
              positions_pulses = group.motor_objects.map(&:position_pulses)
              ips = group.motor_objects.map(&:ip)

              position_percent = nil
              # calculate an average, but only if we know a position for
              # every shade
              if !positions_percent.include?(:nil) && !positions_percent.include?(nil)
                position_percent = positions_percent.inject(&:+) / positions_percent.length
              end

              position_pulses = nil
              if !positions_pulses.include?(:nil) && !positions_pulses.include?(nil)
                position_pulses = positions_pulses.inject(&:+) / positions_pulses.length
              end

              ip = nil
              ip = ips.first if ips.uniq.length == 1
              ip = nil if ip == :nil

              group.publish(:position_percent, position_percent)
              group.publish(:position_pulses, position_pulses)
              group.publish(:ip, ip)
            end
          when Message::PostMotorStatus
            if message.state == :running || motor.state == :running ||
              # if it's explicitly stopped, but we didn't ask it to, it's probably
              # changing directions so keep querying
              (message.state == :stopped &&
                message.last_action_cause == :explicit_command &&
                !(motor.last_action == Message::Stop || motor.last_action.nil?))
              follow_ups << Message::GetMotorStatus.new(message.src)
            end
            # this will do one more position request after it stopped
            follow_ups << Message::GetMotorPosition.new(message.src)
            motor.publish(:state, message.state)
            motor.publish(:last_direction, message.last_direction)
            motor.publish(:last_action_source, message.last_action_source)
            motor.publish(:last_action_cause, message.last_action_cause)
            motor.group_objects.each do |group|
              states = group.motor_objects.map(&:state).uniq
              state = states.length == 1 ? states.first : 'mixed'
              group.publish(:state, state)

              directions = group.motor_objects.map(&:last_direction).uniq
              direction = directions.length == 1 ? directions.first : 'mixed'
              group.publish(:last_direction, direction)
            end
          when Message::PostMotorLimits
            motor.publish(:up_limit, message.up_limit)
            motor.publish(:down_limit, message.down_limit)
          when Message::ILT2::PostMotorSettings
            motor.publish(:down_limit, message.limit)
          when Message::PostMotorDirection
            motor.publish(:direction, message.direction)
          when Message::PostMotorRollingSpeed
            motor.publish(:up_speed, message.up_speed)
            motor.publish(:down_speed, message.down_speed)
            motor.publish(:slow_speed, message.slow_speed)
          when Message::PostMotorIP,
            Message::ILT2::PostMotorIP
            motor.publish(:"ip#{message.ip}_pulses", message.position_pulses)
            if message.respond_to?(:position_percent)
              motor.publish(:"ip#{message.ip}_percent", message.position_percent) 
            elsif motor.down_limit
              motor.publish(:"ip#{message.ip}_percent", message.position_pulses.to_f / motor.down_limit * 100) 
            end
          when Message::PostGroupAddr
            motor.add_group(message.group_index, message.group_address)
          end

          @mutex.synchronize do
            prior_message_to_group = Message.is_group_address?(@prior_message&.message&.src) if @prior_message

            correct_response = @response_pending && @prior_message&.message&.class&.expected_response?(message)
            correct_response = false if !prior_message_to_group && message.src != @prior_message&.message&.dest
            correct_response = false if prior_message_to_group && message.dest != @prior_message&.message&.src

            if prior_message_to_group && correct_response
              @pending_group_motors.delete(Message.print_address(message.src).gsub('.', ''))
              correct_response = false unless @pending_group_motors.empty?
            end

            signal = correct_response || !follow_ups.empty?
            @response_pending = @broadcast_pending if correct_response
            follow_ups.each do |follow_up|
              @queues[1].push(MessageAndRetries.new(follow_up, 5, 1)) unless @queues[1].any? { |mr| mr.message == follow_up }
            end
            @cond.signal if signal
          end
        rescue EOFError
          SDN.logger.fatal "EOF reading"
          exit 2
        rescue MalformedMessage => e
          SDN.logger.warn "ignoring malformed message: #{e}" unless e.to_s =~ /issing data/
        rescue => e
          SDN.logger.error "got garbage: #{e}; #{e.backtrace}"
        end
      end
    end
  end
end