class PahoMqtt::Subscriber
Attributes
subscribed_topics[R]
Public Class Methods
new(sender)
click to toggle source
# File lib/paho_mqtt/subscriber.rb, line 20 def initialize(sender) @waiting_suback = [] @waiting_unsuback = [] @subscribed_mutex = Mutex.new @subscribed_topics = [] @suback_mutex = Mutex.new @unsuback_mutex = Mutex.new @sender = sender end
Public Instance Methods
add_subscription(max_qos, packet_id, adjust_qos)
click to toggle source
# File lib/paho_mqtt/subscriber.rb, line 55 def add_subscription(max_qos, packet_id, adjust_qos) @suback_mutex.synchronize do adjust_qos, @waiting_suback = @waiting_suback.partition { |pck| pck[:id] == packet_id } end if adjust_qos.length == 1 adjust_qos = adjust_qos.first[:packet].topics adjust_qos.each do |t| if [0, 1, 2].include?(max_qos[0]) t[1] = max_qos.shift elsif max_qos[0] == 128 adjust_qos.delete(t) else PahoMqtt.logger.error("The QoS value is invalid in subscribe.") if PahoMqtt.logger? raise PacketException.new('Invalid suback QoS value') end end else PahoMqtt.logger.error("The packet id is invalid, already used.") if PahoMqtt.logger? raise PacketException.new("Invalid suback packet id: #{packet_id}") end @subscribed_mutex.synchronize do @subscribed_topics.concat(adjust_qos) end return adjust_qos end
check_waiting_subscriber()
click to toggle source
# File lib/paho_mqtt/subscriber.rb, line 142 def check_waiting_subscriber @sender.check_ack_alive(@waiting_suback, @suback_mutex) @sender.check_ack_alive(@waiting_unsuback, @unsuback_mutex) end
config_subscription(new_id)
click to toggle source
# File lib/paho_mqtt/subscriber.rb, line 34 def config_subscription(new_id) unless @subscribed_topics == [] || @subscribed_topics.nil? packet = PahoMqtt::Packet::Subscribe.new( :id => new_id, :topics => @subscribed_topics ) @subscribed_mutex.synchronize do @subscribed_topics = [] end @suback_mutex.synchronize do if @waiting_suback.length >= MAX_SUBACK PahoMqtt.logger.error('SUBACK queue is full, could not send subscribe') if PahoMqtt.logger? return MQTT_ERR_FAILURE end @waiting_suback.push(:id => new_id, :packet => packet, :timestamp => Time.now) end @sender.send_packet(packet) end MQTT_ERR_SUCCESS end
remove_subscription(packet_id, to_unsub)
click to toggle source
# File lib/paho_mqtt/subscriber.rb, line 81 def remove_subscription(packet_id, to_unsub) @unsuback_mutex.synchronize do to_unsub, @waiting_unsuback = @waiting_unsuback.partition { |pck| pck[:id] == packet_id } end if to_unsub.length == 1 to_unsub = to_unsub.first[:packet].topics else PahoMqtt.logger.error("The packet id is invalid, already used.") if PahoMqtt.logger? raise PacketException.new("Invalid unsuback packet id: #{packet_id}") end @subscribed_mutex.synchronize do to_unsub.each do |filter| @subscribed_topics.delete_if { |topic| PahoMqtt.match_filter(topic.first, filter) } end end return to_unsub end
send_subscribe(topics, new_id)
click to toggle source
# File lib/paho_mqtt/subscriber.rb, line 101 def send_subscribe(topics, new_id) unless valid_topics?(topics) == MQTT_ERR_FAIL packet = PahoMqtt::Packet::Subscribe.new( :id => new_id, :topics => topics ) @sender.append_to_writing(packet) @suback_mutex.synchronize do if @waiting_suback.length >= MAX_SUBACK PahoMqtt.logger.error('SUBACK queue is full, could not send subscribe') if PahoMqtt.logger? return MQTT_ERR_FAILURE end @waiting_suback.push(:id => new_id, :packet => packet, :timestamp => Time.now) end MQTT_ERR_SUCCESS else raise ProtocolViolation end end
send_unsubscribe(topics, new_id)
click to toggle source
# File lib/paho_mqtt/subscriber.rb, line 121 def send_unsubscribe(topics, new_id) unless valid_topics?(topics) == MQTT_ERR_FAIL packet = PahoMqtt::Packet::Unsubscribe.new( :id => new_id, :topics => topics ) @sender.append_to_writing(packet) @unsuback_mutex.synchronize do if @waiting_suback.length >= MAX_UNSUBACK PahoMqtt.logger.error('UNSUBACK queue is full, could not send unbsubscribe') if PahoMqtt.logger? return MQTT_ERR_FAIL end @waiting_unsuback.push(:id => new_id, :packet => packet, :timestamp => Time.now) end MQTT_ERR_SUCCESS else raise ProtocolViolation end end
sender=(sender)
click to toggle source
# File lib/paho_mqtt/subscriber.rb, line 30 def sender=(sender) @sender = sender end
valid_topics?(topics)
click to toggle source
# File lib/paho_mqtt/subscriber.rb, line 147 def valid_topics?(topics) unless topics.length == 0 topics.map do |topic| case topic when Array return MQTT_ERR_FAIL if topic.first == "" when String return MQTT_ERR_FAIL if topic == "" end end else MQTT_ERR_FAIL end MQTT_ERR_SUCCESS end