module SDN::CLI::MQTT::Subscriptions

Public Instance Methods

handle_message(topic, value) click to toggle source
# File lib/sdn/cli/mqtt/subscriptions.rb, line 5
def handle_message(topic, value)
  SDN.logger.info "got #{value.inspect} at #{topic}"
  if (match = topic.match(%r{^#{Regexp.escape(@base_topic)}/(?<addr>\h{6})/(?<property>discover|label|control|jog-(?<jog_type>pulses|ms)|position-pulses|position-percent|ip|reset|(?<speed_type>up-speed|down-speed|slow-speed)|up-limit|down-limit|direction|ip(?<ip>\d+)-(?<ip_type>pulses|percent)|groups)/set$}))
    addr = Message.parse_address(match[:addr])
    property = match[:property]
    # not homie compliant; allows linking the position-percent property
    # directly to an OpenHAB rollershutter channel
    if property == 'position-percent' && value =~ /^(?:UP|DOWN|STOP)$/i
      property = "control"
      value = value.downcase
    end
    mqtt_addr = Message.print_address(addr).gsub('.', '')
    motor = @motors[mqtt_addr]
    is_group = Message.is_group_address?(addr)
    group = @groups[mqtt_addr]
    follow_up = motor&.node_type == :st50ilt2 ? Message::ILT2::GetMotorPosition.new(addr) :
      Message::GetMotorStatus.new(addr)
    ns = motor&.node_type == :st50ilt2 ? Message::ILT2 : Message

    message = case property
      when 'discover'
        follow_up = nil
        if value == "discover"
          # discovery is low priority, and longer timeout
          enqueue(MessageAndRetries.new(Message::GetNodeAddr.new(addr), 1, 2), 2)
        end
        nil
      when 'label'
        follow_up = Message::GetNodeLabel.new(addr)
        ns::SetNodeLabel.new(addr, value) unless is_group
      when 'control'
        case value
        when 'up', 'down'
          (motor&.node_type == :st50ilt2 ? ns::SetMotorPosition : Message::MoveTo).
            new(addr, "#{value}_limit".to_sym)
        when 'stop'
          motor&.node_type == :st50ilt2 ? ns::SetMotorPosition.new(addr, :stop) : Message::Stop.new(addr)
        when 'next_ip'
          motor&.node_type == :st50ilt2 ? ns::SetMotorPosition.new(addr, :next_ip_down) :
            Message::MoveOf.new(addr, :next_ip)
        when 'previous_ip'
          motor&.node_type == :st50ilt2 ? ns::SetMotorPosition.new(addr, :next_ip_up) :
            Message::MoveOf.new(addr, :previous_ip)
        when 'wink'
          Message::Wink.new(addr)
        when 'refresh'
          follow_up = nil
          (motor&.node_type == :st50ilt2 ? ns::GetMotorPosition : Message::GetMotorStatus).
            new(addr)
        end
      when /jog-(?:pulses|ms)/
        value = value.to_i
        (motor&.node_type == :st50ilt2 ? ns::SetMotorPosition : Message::MoveOf).
          new(addr, "jog_#{value < 0 ? :up : :down }_#{match[:jog_type]}".to_sym, value.abs)
      when 'reset'
        return unless Message::SetFactoryDefault::RESET.keys.include?(value.to_sym)
        Message::SetFactoryDefault.new(addr, value.to_sym)
      when 'position-pulses', 'position-percent', 'ip'
        if value == 'REFRESH'
          follow_up = nil
          (motor&.node_type == :st50ilt2 ? ns::GetMotorPosition : Message::GetMotorStatus).
            new(addr)
        else
          (motor&.node_type == :st50ilt2 ? ns::SetMotorPosition : Message::MoveTo).
            new(addr, property.sub('position-', 'position_').to_sym, value.to_i)
        end
      when 'direction'
        return if is_group
        follow_up = Message::GetMotorDirection.new(addr)
        return unless %w{standard reversed}.include?(value)
        Message::SetMotorDirection.new(addr, value.to_sym)
      when 'up-limit', 'down-limit'
        return if is_group
        if %w{delete current_position jog_ms jog_pulses}.include?(value)
          type = value.to_sym
          value = 10
        else
          type = :specified_position
        end
        target = property == 'up-limit' ? :up : :down
        follow_up = Message::GetMotorLimits.new(addr)
        Message::SetMotorLimits.new(addr, type, target, value.to_i)
      when /^ip\d-(?:pulses|percent)$/
        return if is_group
        ip = match[:ip].to_i
        return unless (1..16).include?(ip)
        follow_up = ns::GetMotorIP.new(addr, ip)

        if motor&.node_type == :st50ilt2
          value = if value == 'delete'
            nil
          elsif value == 'current_position'
            motor.position_pulses
          elsif match[:ip_type] == 'pulses'
            value.to_i
          else
            value.to_f / motor.down_limit * 100
          end
          ns::SetMotorIP.new(addr, ip, value)
        else
          type = if value == 'delete'
            :delete
          elsif value == 'current_position'
            :current_position
          elsif match[:ip_type] == 'pulses'
            :position_pulses
          else
            :position_percent
          end
          Message::SetMotorIP.new(addr, type, ip, value.to_i)
        end
      when 'up-speed', 'down-speed', 'slow-speed'
        return if is_group
        return unless motor
        follow_up = Message::GetMotorRollingSpeed.new(addr)
        message = Message::SetMotorRollingSpeed.new(addr,
          up_speed: motor.up_speed,
          down_speed: motor.down_speed,
          slow_speed: motor.slow_speed)
        message.send(:"#{property.sub('-', '_')}=", value.to_i)
        message
      when 'groups'
        return if is_group
        return unless motor
        messages = motor.set_groups(value)
        @mutex.synchronize do
          messages.each { |m| @queues[0].push(MessageAndRetries.new(m, 5, 0)) }
          @cond.signal
        end
        nil
    end

    if motor
      motor.last_action = message.class if [Message::MoveTo, Message::Move, Message::Wink, Message::Stop].include?(message.class)
    end

    if message
      message.ack_requested = true if message.class.name !~ /^SDN::Message::Get/
      @mutex.synchronize do
        @queues[0].push(MessageAndRetries.new(message, 5, 0))
        if follow_up
          @queues[1].push(MessageAndRetries.new(follow_up, 5, 1)) unless @queues[1].any? { |mr| mr.message == follow_up }
        end
        @cond.signal
      end
    end
  end
end