class PahoMqtt::Sender
Attributes
last_ping_req[RW]
Public Class Methods
new(ack_timeout)
click to toggle source
# File lib/paho_mqtt/sender.rb, line 20 def initialize(ack_timeout) @socket = nil @writing_queue = [] @publish_queue = [] @publish_mutex = Mutex.new @writing_mutex = Mutex.new @last_ping_req = -1 @ack_timeout = ack_timeout end
Public Instance Methods
append_to_writing(packet)
click to toggle source
# File lib/paho_mqtt/sender.rb, line 62 def append_to_writing(packet) begin if packet.is_a?(PahoMqtt::Packet::Publish) prepare_sending(@publish_queue, @publish_mutex, MAX_PUBLISH, packet) else prepare_sending(@writing_queue, @writing_mutex, MAX_QUEUE, packet) end rescue FullWritingException sleep SELECT_TIMEOUT retry end MQTT_ERR_SUCCESS end
check_ack_alive(queue, mutex)
click to toggle source
# File lib/paho_mqtt/sender.rb, line 111 def check_ack_alive(queue, mutex) mutex.synchronize do now = Time.now queue.each do |pck| if now >= pck[:timestamp] + @ack_timeout pck[:packet].dup ||= true unless pck[:packet].class == PahoMqtt::Packet::Subscribe || pck[:packet].class == PahoMqtt::Packet::Unsubscribe PahoMqtt.logger.info("Acknowledgement timeout is over, resending #{pck[:packet].inspect}") if PahoMqtt.logger? send_packet(pck[:packet]) pck[:timestamp] = now end end end end
flush_waiting_packet(sending=true)
click to toggle source
# File lib/paho_mqtt/sender.rb, line 94 def flush_waiting_packet(sending=true) if sending @writing_mutex.synchronize do @writing_queue.each do |packet| send_packet(packet) end end @publish_mutex.synchronize do @publish_queue.each do |packet| send_packet(packet) end end end @writing_queue = [] @publish_queue = [] end
prepare_sending(queue, mutex, max_packet, packet)
click to toggle source
# File lib/paho_mqtt/sender.rb, line 51 def prepare_sending(queue, mutex, max_packet, packet) if queue.length < max_packet mutex.synchronize do queue.push(packet) end else PahoMqtt.logger.error('Writing queue is full, slowing down') if PahoMqtt.logger? raise FullWritingException end end
send_packet(packet)
click to toggle source
# File lib/paho_mqtt/sender.rb, line 34 def send_packet(packet) begin @socket.write(packet.to_s) unless @socket.nil? || @socket.closed? @last_ping_req = Time.now MQTT_ERR_SUCCESS end rescue StandardError raise WritingException rescue IO::WaitWritable IO.select(nil, [@socket], nil, SELECT_TIMEOUT) retry end
send_pingreq()
click to toggle source
# File lib/paho_mqtt/sender.rb, line 47 def send_pingreq send_packet(PahoMqtt::Packet::Pingreq.new) end
socket=(socket)
click to toggle source
# File lib/paho_mqtt/sender.rb, line 30 def socket=(socket) @socket = socket end
writing_loop()
click to toggle source
# File lib/paho_mqtt/sender.rb, line 76 def writing_loop @writing_mutex.synchronize do MAX_QUEUE.times do break if @writing_queue.empty? packet = @writing_queue.shift send_packet(packet) end end @publish_mutex.synchronize do MAX_PUBLISH.times do break if @publish_queue.empty? packet = @publish_queue.shift send_packet(packet) end end MQTT_ERR_SUCCESS end