class PahoMqtt::Sender

Attributes

last_ping_req[RW]

Public Class Methods

new(ack_timeout) click to toggle source
# File lib/paho_mqtt/sender.rb, line 20
def initialize(ack_timeout)
  @socket        = nil
  @writing_queue = []
  @writing_mutex = Mutex.new
  @last_ping_req = -1
  @ack_timeout   = ack_timeout
end

Public Instance Methods

append_to_writing(packet) click to toggle source
# File lib/paho_mqtt/sender.rb, line 42
def append_to_writing(packet)
  @writing_mutex.synchronize do
    @writing_queue.push(packet) unless @writing_queue.length >= MAX_WRITING
  end
  MQTT_ERR_SUCCESS
end
check_ack_alive(queue, mutex) click to toggle source
# File lib/paho_mqtt/sender.rb, line 73
def check_ack_alive(queue, mutex)
  mutex.synchronize do
    now = Time.now
    queue.each do |pck|
      if now >= pck[:timestamp] + @ack_timeout
        pck[:packet].dup ||= true unless pck[:packet].class == PahoMqtt::Packet::Subscribe || pck[:packet].class == PahoMqtt::Packet::Unsubscribe
        append_to_writing(pck[:packet])
        pck[:timestamp] = now
      end
    end
  end
end
flush_waiting_packet(sending=true) click to toggle source
# File lib/paho_mqtt/sender.rb, line 61
def flush_waiting_packet(sending=true)
  if sending
    @writing_mutex.synchronize do
      @writing_queue.each do |m|
        send_packet(m)
      end
    end
  else
    @writing_queue = []
  end
end
send_packet(packet) click to toggle source
# File lib/paho_mqtt/sender.rb, line 32
def send_packet(packet)
  begin
    @socket.write(packet.to_s) unless @socket.nil? || @socket.closed?
    @last_ping_req = Time.now
    MQTT_ERR_SUCCESS
  rescue StandardError
    raise WritingException
  end
end
socket=(socket) click to toggle source
# File lib/paho_mqtt/sender.rb, line 28
def socket=(socket)
  @socket = socket
end
writing_loop(max_packet) click to toggle source
# File lib/paho_mqtt/sender.rb, line 49
def writing_loop(max_packet)
  @writing_mutex.synchronize do
    cnt = 0
    while !@writing_queue.empty? && cnt < max_packet do
      packet = @writing_queue.shift
      send_packet(packet)
      cnt += 1
    end
  end
  MQTT_ERR_SUCCESS
end