class SDN::CLI::MQTT
Constants
- BROADCAST_WAIT
- Group
- MessageAndRetries
- Motor
- WAIT_TIME
Attributes
groups[R]
motors[R]
Public Class Methods
new(port, mqtt_uri, device_id: "somfy", base_topic: "homie", auto_discover: true)
click to toggle source
# File lib/sdn/cli/mqtt.rb, line 25 def initialize(port, mqtt_uri, device_id: "somfy", base_topic: "homie", auto_discover: true) @base_topic = "#{base_topic}/#{device_id}" @mqtt = ::MQTT::Client.new(mqtt_uri) @mqtt.set_will("#{@base_topic}/$state", "lost", true) @mqtt.connect @motors = {} @groups = {} @mutex = Mutex.new @cond = ConditionVariable.new @queues = [[], [], []] @response_pending = false @broadcast_pending = false @auto_discover = auto_discover @motors_found = true clear_tree(@base_topic) publish_basic_attributes @sdn = Client.new(port) read_thread = Thread.new { read } write_thread = Thread.new { write } @mqtt.get { |topic, value| handle_message(topic, value) } end
Public Instance Methods
add_group(addr)
click to toggle source
# File lib/sdn/cli/mqtt.rb, line 316 def add_group(addr) addr = addr.gsub('.', '') group = @groups[addr] return group if group @mqtt.batch_publish do publish("#{addr}/$name", addr) publish("#{addr}/$type", "Shade Group") publish("#{addr}/$properties", "discover,control,jog-ms,jog-pulses,position-pulses,position-percent,ip,reset,state,last-direction,motors") publish("#{addr}/discover/$name", "Trigger Motor Discovery") publish("#{addr}/discover/$datatype", "enum") publish("#{addr}/discover/$format", "discover") publish("#{addr}/discover/$settable", "true") publish("#{addr}/discover/$retained", "false") publish("#{addr}/control/$name", "Control motors") publish("#{addr}/control/$datatype", "enum") publish("#{addr}/control/$format", "up,down,stop,wink,next_ip,previous_ip,refresh") publish("#{addr}/control/$settable", "true") publish("#{addr}/control/$retained", "false") publish("#{addr}/jog-ms/$name", "Jog motors by ms") publish("#{addr}/jog-ms/$datatype", "integer") publish("#{addr}/jog-ms/$format", "-65535:65535") publish("#{addr}/jog-ms/$unit", "ms") publish("#{addr}/jog-ms/$settable", "true") publish("#{addr}/jog-ms/$retained", "false") publish("#{addr}/jog-pulses/$name", "Jog motors by pulses") publish("#{addr}/jog-pulses/$datatype", "integer") publish("#{addr}/jog-pulses/$format", "-65535:65535") publish("#{addr}/jog-pulses/$unit", "pulses") publish("#{addr}/jog-pulses/$settable", "true") publish("#{addr}/jog-pulses/$retained", "false") publish("#{addr}/position-pulses/$name", "Position from up limit (in pulses)") publish("#{addr}/position-pulses/$datatype", "integer") publish("#{addr}/position-pulses/$format", "0:65535") publish("#{addr}/position-pulses/$unit", "pulses") publish("#{addr}/position-pulses/$settable", "true") publish("#{addr}/position-percent/$name", "Position (in %)") publish("#{addr}/position-percent/$datatype", "integer") publish("#{addr}/position-percent/$format", "0:100") publish("#{addr}/position-percent/$unit", "%") publish("#{addr}/position-percent/$settable", "true") publish("#{addr}/ip/$name", "Intermediate Position") publish("#{addr}/ip/$datatype", "integer") publish("#{addr}/ip/$format", "1:16") publish("#{addr}/ip/$settable", "true") publish("#{addr}/state/$name", "State of the motors") publish("#{addr}/state/$datatype", "enum") publish("#{addr}/state/$format", Message::PostMotorStatus::STATE.keys.join(',') + ",mixed") publish("#{addr}/last-direction/$name", "Direction of last motion") publish("#{addr}/last-direction/$datatype", "enum") publish("#{addr}/last-direction/$format", Message::PostMotorStatus::DIRECTION.keys.join(',') + ",mixed") publish("#{addr}/motors/$name", "Comma separated motor addresses that are members of this group") publish("#{addr}/motors/$datatype", "string") group = @groups[addr] = Group.new(self, addr) publish("$nodes", (["FFFFFF"] + @motors.keys.sort + @groups.keys.sort).join(",")) end group end
clear_tree(topic)
click to toggle source
# File lib/sdn/cli/mqtt.rb, line 71 def clear_tree(topic) @mqtt.subscribe("#{topic}/#") @mqtt.unsubscribe("#{topic}/#", wait_for_ack: true) while !@mqtt.queue_empty? topic, value = @mqtt.get @mqtt.publish(topic, nil, retain: true) end end
enqueue(message, queue = 0)
click to toggle source
# File lib/sdn/cli/mqtt.rb, line 61 def enqueue(message, queue = 0) @mutex.synchronize do queue = @queues[queue] unless queue.include?(message) queue.push(message) @cond.signal end end end
publish(topic, value)
click to toggle source
# File lib/sdn/cli/mqtt.rb, line 53 def publish(topic, value) @mqtt.publish("#{@base_topic}/#{topic}", value, retain: true, qos: 1) end
publish_basic_attributes()
click to toggle source
# File lib/sdn/cli/mqtt.rb, line 80 def publish_basic_attributes @mqtt.batch_publish do publish("$homie", "v4.0.0") publish("$name", "Somfy SDN Network") publish("$state", "init") publish("$nodes", "FFFFFF") publish("FFFFFF/$name", "Broadcast") publish("FFFFFF/$type", "sdn") publish("FFFFFF/$properties", "discover") publish("FFFFFF/discover/$name", "Trigger Motor Discovery") publish("FFFFFF/discover/$datatype", "enum") publish("FFFFFF/discover/$format", "discover") publish("FFFFFF/discover/$settable", "true") publish("FFFFFF/discover/$retained", "false") subscribe("+/discover/set") subscribe("+/label/set") subscribe("+/control/set") subscribe("+/jog-ms/set") subscribe("+/jog-pulses/set") subscribe("+/position-pulses/set") subscribe("+/position-percent/set") subscribe("+/ip/set") subscribe("+/reset/set") subscribe("+/direction/set") subscribe("+/up-speed/set") subscribe("+/down-speed/set") subscribe("+/slow-speed/set") subscribe("+/up-limit/set") subscribe("+/down-limit/set") subscribe("+/groups/set") (1..16).each do |ip| subscribe("+/ip#{ip}-pulses/set") subscribe("+/ip#{ip}-percent/set") end publish("$state", "ready") end end
publish_motor(addr, node_type)
click to toggle source
# File lib/sdn/cli/mqtt.rb, line 122 def publish_motor(addr, node_type) motor = nil @mqtt.batch_publish do publish("#{addr}/$name", addr) publish("#{addr}/$type", node_type.to_s) properties = %w{ discover label state control jog-ms jog-pulses position-pulses position-percent ip down-limit groups last-direction } + (1..16).map { |ip| ["ip#{ip}-pulses", "ip#{ip}-percent"] }.flatten unless node_type == :st50ilt2 properties.concat %w{ reset last-action-source last-action-cause up-limit direction up-speed down-speed slow-speed } end publish("#{addr}/$properties", properties.join(",")) publish("#{addr}/discover/$name", "Trigger Motor Discovery") publish("#{addr}/discover/$datatype", "enum") publish("#{addr}/discover/$format", "discover") publish("#{addr}/discover/$settable", "true") publish("#{addr}/discover/$retained", "false") publish("#{addr}/label/$name", "Node label") publish("#{addr}/label/$datatype", "string") publish("#{addr}/label/$settable", "true") publish("#{addr}/state/$name", "Current state of the motor") publish("#{addr}/state/$datatype", "enum") publish("#{addr}/state/$format", Message::PostMotorStatus::STATE.keys.join(',')) publish("#{addr}/control/$name", "Control motor") publish("#{addr}/control/$datatype", "enum") publish("#{addr}/control/$format", "up,down,stop,wink,next_ip,previous_ip,refresh") publish("#{addr}/control/$settable", "true") publish("#{addr}/control/$retained", "false") publish("#{addr}/jog-ms/$name", "Jog motor by ms") publish("#{addr}/jog-ms/$datatype", "integer") publish("#{addr}/jog-ms/$format", "-65535:65535") publish("#{addr}/jog-ms/$unit", "ms") publish("#{addr}/jog-ms/$settable", "true") publish("#{addr}/jog-ms/$retained", "false") publish("#{addr}/jog-pulses/$name", "Jog motor by pulses") publish("#{addr}/jog-pulses/$datatype", "integer") publish("#{addr}/jog-pulses/$format", "-65535:65535") publish("#{addr}/jog-pulses/$unit", "pulses") publish("#{addr}/jog-pulses/$settable", "true") publish("#{addr}/jog-pulses/$retained", "false") publish("#{addr}/position-percent/$name", "Position (in %)") publish("#{addr}/position-percent/$datatype", "integer") publish("#{addr}/position-percent/$format", "0:100") publish("#{addr}/position-percent/$unit", "%") publish("#{addr}/position-percent/$settable", "true") publish("#{addr}/position-pulses/$name", "Position from up limit (in pulses)") publish("#{addr}/position-pulses/$datatype", "integer") publish("#{addr}/position-pulses/$format", "0:65535") publish("#{addr}/position-pulses/$unit", "pulses") publish("#{addr}/position-pulses/$settable", "true") publish("#{addr}/ip/$name", "Intermediate Position") publish("#{addr}/ip/$datatype", "integer") publish("#{addr}/ip/$format", "1:16") publish("#{addr}/ip/$settable", "true") publish("#{addr}/ip/$retained", "false") if node_type == :st50ilt2 publish("#{addr}/down-limit/$name", "Down limit") publish("#{addr}/down-limit/$datatype", "integer") publish("#{addr}/down-limit/$format", "0:65535") publish("#{addr}/down-limit/$unit", "pulses") publish("#{addr}/down-limit/$settable", "true") publish("#{addr}/last-direction/$name", "Direction of last motion") publish("#{addr}/last-direction/$datatype", "enum") publish("#{addr}/last-direction/$format", Message::PostMotorStatus::DIRECTION.keys.join(',')) unless node_type == :st50ilt2 publish("#{addr}/reset/$name", "Recall factory settings") publish("#{addr}/reset/$datatype", "enum") publish("#{addr}/reset/$format", Message::SetFactoryDefault::RESET.keys.join(',')) publish("#{addr}/reset/$settable", "true") publish("#{addr}/reset/$retained", "false") publish("#{addr}/last-action-source/$name", "Source of last action") publish("#{addr}/last-action-source/$datatype", "enum") publish("#{addr}/last-action-source/$format", Message::PostMotorStatus::SOURCE.keys.join(',')) publish("#{addr}/last-action-cause/$name", "Cause of last action") publish("#{addr}/last-action-cause/$datatype", "enum") publish("#{addr}/last-action-cause/$format", Message::PostMotorStatus::CAUSE.keys.join(',')) publish("#{addr}/up-limit/$name", "Up limit (always = 0)") publish("#{addr}/up-limit/$datatype", "integer") publish("#{addr}/up-limit/$format", "0:65535") publish("#{addr}/up-limit/$unit", "pulses") publish("#{addr}/up-limit/$settable", "true") publish("#{addr}/direction/$name", "Motor rotation direction") publish("#{addr}/direction/$datatype", "enum") publish("#{addr}/direction/$format", "standard,reversed") publish("#{addr}/direction/$settable", "true") publish("#{addr}/up-speed/$name", "Up speed") publish("#{addr}/up-speed/$datatype", "integer") publish("#{addr}/up-speed/$format", "6:28") publish("#{addr}/up-speed/$unit", "RPM") publish("#{addr}/up-speed/$settable", "true") publish("#{addr}/down-speed/$name", "Down speed, always = Up speed") publish("#{addr}/down-speed/$datatype", "integer") publish("#{addr}/down-speed/$format", "6:28") publish("#{addr}/down-speed/$unit", "RPM") publish("#{addr}/down-speed/$settable", "true") publish("#{addr}/slow-speed/$name", "Slow speed") publish("#{addr}/slow-speed/$datatype", "integer") publish("#{addr}/slow-speed/$format", "6:28") publish("#{addr}/slow-speed/$unit", "RPM") publish("#{addr}/slow-speed/$settable", "true") end publish("#{addr}/groups/$name", "Group Memberships (comma separated, address must start 0101xx)") publish("#{addr}/groups/$datatype", "string") publish("#{addr}/groups/$settable", "true") (1..16).each do |ip| publish("#{addr}/ip#{ip}-pulses/$name", "Intermediate Position #{ip}") publish("#{addr}/ip#{ip}-pulses/$datatype", "integer") publish("#{addr}/ip#{ip}-pulses/$format", "0:65535") publish("#{addr}/ip#{ip}-pulses/$unit", "pulses") publish("#{addr}/ip#{ip}-pulses/$settable", "true") publish("#{addr}/ip#{ip}-percent/$name", "Intermediate Position #{ip}") publish("#{addr}/ip#{ip}-percent/$datatype", "integer") publish("#{addr}/ip#{ip}-percent/$format", "0:100") publish("#{addr}/ip#{ip}-percent/$unit", "%") publish("#{addr}/ip#{ip}-percent/$settable", "true") end motor = Motor.new(self, addr, node_type) @motors[addr] = motor publish("$nodes", (["FFFFFF"] + @motors.keys.sort + @groups.keys.sort).join(",")) end sdn_addr = Message.parse_address(addr) @mutex.synchronize do @queues[2].push(MessageAndRetries.new(Message::GetNodeLabel.new(sdn_addr), 5, 2)) case node_type when :st30 @queues[2].push(MessageAndRetries.new(Message::GetMotorStatus.new(sdn_addr), 5, 2)) @queues[2].push(MessageAndRetries.new(Message::GetMotorLimits.new(sdn_addr), 5, 2)) @queues[2].push(MessageAndRetries.new(Message::GetMotorDirection.new(sdn_addr), 5, 2)) @queues[2].push(MessageAndRetries.new(Message::GetMotorRollingSpeed.new(sdn_addr), 5, 2)) (1..16).each { |ip| @queues[2].push(MessageAndRetries.new(Message::GetMotorIP.new(sdn_addr, ip), 5, 2)) } when :st50ilt2 @queues[2].push(MessageAndRetries.new(Message::ILT2::GetMotorSettings.new(sdn_addr), 5, 2)) @queues[2].push(MessageAndRetries.new(Message::ILT2::GetMotorPosition.new(sdn_addr), 5, 2)) (1..16).each { |ip| @queues[2].push(MessageAndRetries.new(Message::ILT2::GetMotorIP.new(sdn_addr, ip), 5, 2)) } end (1..16).each { |g| @queues[2].push(MessageAndRetries.new(Message::GetGroupAddr.new(sdn_addr, g), 5, 2)) } @cond.signal end motor end
subscribe(topic)
click to toggle source
# File lib/sdn/cli/mqtt.rb, line 57 def subscribe(topic) @mqtt.subscribe("#{@base_topic}/#{topic}") end
touch_group(group_addr)
click to toggle source
# File lib/sdn/cli/mqtt.rb, line 311 def touch_group(group_addr) group = @groups[Message.print_address(group_addr).gsub('.', '')] group&.publish(:motors, group.motors_string) end