class PahoMqtt::Sender
Attributes
last_ping_req[RW]
Public Class Methods
new(ack_timeout)
click to toggle source
# File lib/paho_mqtt/sender.rb, line 20 def initialize(ack_timeout) @socket = nil @writing_queue = [] @writing_mutex = Mutex.new @last_ping_req = -1 @ack_timeout = ack_timeout end
Public Instance Methods
append_to_writing(packet)
click to toggle source
# File lib/paho_mqtt/sender.rb, line 42 def append_to_writing(packet) @writing_mutex.synchronize do @writing_queue.push(packet) unless @writing_queue.length >= MAX_WRITING end MQTT_ERR_SUCCESS end
check_ack_alive(queue, mutex)
click to toggle source
# File lib/paho_mqtt/sender.rb, line 73 def check_ack_alive(queue, mutex) mutex.synchronize do now = Time.now queue.each do |pck| if now >= pck[:timestamp] + @ack_timeout pck[:packet].dup ||= true unless pck[:packet].class == PahoMqtt::Packet::Subscribe || pck[:packet].class == PahoMqtt::Packet::Unsubscribe append_to_writing(pck[:packet]) pck[:timestamp] = now end end end end
flush_waiting_packet(sending=true)
click to toggle source
# File lib/paho_mqtt/sender.rb, line 61 def flush_waiting_packet(sending=true) if sending @writing_mutex.synchronize do @writing_queue.each do |m| send_packet(m) end end else @writing_queue = [] end end
send_packet(packet)
click to toggle source
# File lib/paho_mqtt/sender.rb, line 32 def send_packet(packet) begin @socket.write(packet.to_s) unless @socket.nil? || @socket.closed? @last_ping_req = Time.now MQTT_ERR_SUCCESS rescue StandardError raise WritingException end end
socket=(socket)
click to toggle source
# File lib/paho_mqtt/sender.rb, line 28 def socket=(socket) @socket = socket end
writing_loop(max_packet)
click to toggle source
# File lib/paho_mqtt/sender.rb, line 49 def writing_loop(max_packet) @writing_mutex.synchronize do cnt = 0 while !@writing_queue.empty? && cnt < max_packet do packet = @writing_queue.shift send_packet(packet) cnt += 1 end end MQTT_ERR_SUCCESS end