class PahoMqtt::Handler

Attributes

clean_session[RW]
last_ping_resp[RW]
registered_callback[R]

Public Class Methods

new() click to toggle source
# File lib/paho_mqtt/handler.rb, line 22
def initialize
  @registered_callback = []
  @last_ping_resp      = -1
  @publisher           = nil
  @subscriber          = nil
end

Public Instance Methods

check_callback(packet) click to toggle source
# File lib/paho_mqtt/handler.rb, line 262
def check_callback(packet)
  callbacks = []
  @registered_callback.each do |reccord|
    callbacks.push(reccord.last) if PahoMqtt.match_filter(packet.topic, reccord.first)
  end
  unless callbacks.empty?
    callbacks.each do |callback|
      callback.call(packet)
    end
  end
end
clean_session?(session_flag) click to toggle source
# File lib/paho_mqtt/handler.rb, line 108
def clean_session?(session_flag)
  if @clean_session && !session_flag
    PahoMqtt.logger.debug("No previous session found by server, starting a new one.") if PahoMqtt.logger?
  end
end
clear_topic_callback(topic) click to toggle source
# File lib/paho_mqtt/handler.rb, line 75
def clear_topic_callback(topic)
  if topic.nil?
    PahoMqtt.logger.error("The topics where the callback is trying to be unregistered have been found nil.") if PahoMqtt.logger?
    raise ArgumentError
  end
  @registered_callback.delete_if { |pair| pair.first == topic }
  MQTT_ERR_SUCCESS
end
config_pubsub(publisher, subscriber) click to toggle source
# File lib/paho_mqtt/handler.rb, line 29
def config_pubsub(publisher, subscriber)
  @publisher = publisher
  @subscriber = subscriber
end
handle_connack(packet) click to toggle source
# File lib/paho_mqtt/handler.rb, line 84
def handle_connack(packet)
  if packet.return_code == 0x00
    PahoMqtt.logger.debug(packet.return_msg) if PahoMqtt.logger?
    handle_connack_accepted(packet.session_present)
  else
    PahoMqtt.logger.warn(packet.return_msg) if PahoMqtt.logger?
    MQTT_CS_DISCONNECT
  end
  @on_connack.call(packet) unless @on_connack.nil?
  MQTT_CS_CONNECTED
end
handle_connack_accepted(session_flag) click to toggle source
# File lib/paho_mqtt/handler.rb, line 96
def handle_connack_accepted(session_flag)
  clean_session?(session_flag)
  new_session?(session_flag)
  old_session?(session_flag)
end
handle_packet(packet) click to toggle source
# File lib/paho_mqtt/handler.rb, line 55
def handle_packet(packet)
  PahoMqtt.logger.info("New packet #{packet.class} received.") if PahoMqtt.logger?
  type = packet_type(packet)
  self.send("handle_#{type}", packet)
end
handle_pingresp(_packet) click to toggle source
# File lib/paho_mqtt/handler.rb, line 120
def handle_pingresp(_packet)
  @last_ping_resp = Time.now
end
handle_puback(packet) click to toggle source
# File lib/paho_mqtt/handler.rb, line 152
def handle_puback(packet)
  id = packet.id
  if @publisher.do_puback(id) == MQTT_ERR_SUCCESS
    @on_puback.call(packet) unless @on_puback.nil?
  end
end
handle_pubcomp(packet) click to toggle source
# File lib/paho_mqtt/handler.rb, line 173
def handle_pubcomp(packet)
  id = packet.id
  if @publisher.do_pubcomp(id) == MQTT_ERR_SUCCESS
    @on_pubcomp.call(packet) unless @on_pubcomp.nil?
  end
end
handle_publish(packet) click to toggle source
# File lib/paho_mqtt/handler.rb, line 143
def handle_publish(packet)
  id = packet.id
  qos = packet.qos
  if @publisher.do_publish(qos, id) == MQTT_ERR_SUCCESS
    @on_message.call(packet) unless @on_message.nil?
    check_callback(packet)
  end
end
handle_pubrec(packet) click to toggle source
# File lib/paho_mqtt/handler.rb, line 159
def handle_pubrec(packet)
  id = packet.id
  if @publisher.do_pubrec(id) == MQTT_ERR_SUCCESS
    @on_pubrec.call(packet) unless @on_pubrec.nil?
  end
end
handle_pubrel(packet) click to toggle source
# File lib/paho_mqtt/handler.rb, line 166
def handle_pubrel(packet)
  id = packet.id
  if @publisher.do_pubrel(id) == MQTT_ERR_SUCCESS
    @on_pubrel.call(packet) unless @on_pubrel.nil?
  end
end
handle_suback(packet) click to toggle source
# File lib/paho_mqtt/handler.rb, line 124
def handle_suback(packet)
  max_qos = packet.return_codes
  id      = packet.id
  topics  = []
  topics  = @subscriber.add_subscription(max_qos, id, topics)
  unless topics.empty?
    @on_suback.call(topics) unless @on_suback.nil?
  end
end
handle_unsuback(packet) click to toggle source
# File lib/paho_mqtt/handler.rb, line 134
def handle_unsuback(packet)
  id = packet.id
  topics = []
  topics = @subscriber.remove_subscription(id, topics)
  unless topics.empty?
    @on_unsuback.call(topics) unless @on_unsuback.nil?
  end
end
new_session?(session_flag) click to toggle source
# File lib/paho_mqtt/handler.rb, line 102
def new_session?(session_flag)
  if !@clean_session && !session_flag
    PahoMqtt.logger.debug("New session created for the client.") if PahoMqtt.logger?
  end
end
old_session?(session_flag) click to toggle source
# File lib/paho_mqtt/handler.rb, line 114
def old_session?(session_flag)
  if !@clean_session && session_flag
    PahoMqtt.logger.debug("Previous session restored by the server.") if PahoMqtt.logger?
  end
end
on_connack(&block) click to toggle source
# File lib/paho_mqtt/handler.rb, line 180
def on_connack(&block)
  @on_connack = block if block_given?
  @on_connack
end
on_connack=(callback) click to toggle source
# File lib/paho_mqtt/handler.rb, line 220
def on_connack=(callback)
  @on_connack = callback if callback.is_a?(Proc)
end
on_message(&block) click to toggle source
# File lib/paho_mqtt/handler.rb, line 215
def on_message(&block)
  @on_message = block if block_given?
  @on_message
end
on_message=(callback) click to toggle source
# File lib/paho_mqtt/handler.rb, line 248
def on_message=(callback)
  @on_message = callback if callback.is_a?(Proc)
end
on_puback(&block) click to toggle source
# File lib/paho_mqtt/handler.rb, line 195
def on_puback(&block)
  @on_puback = block if block_given?
  @on_puback
end
on_puback=(callback) click to toggle source
# File lib/paho_mqtt/handler.rb, line 232
def on_puback=(callback)
  @on_puback = callback if callback.is_a?(Proc)
end
on_pubcomp(&block) click to toggle source
# File lib/paho_mqtt/handler.rb, line 210
def on_pubcomp(&block)
  @on_pubcomp = block if block_given?
  @on_pubcomp
end
on_pubcomp=(callback) click to toggle source
# File lib/paho_mqtt/handler.rb, line 244
def on_pubcomp=(callback)
  @on_pubcomp = callback if callback.is_a?(Proc)
end
on_pubrec(&block) click to toggle source
# File lib/paho_mqtt/handler.rb, line 200
def on_pubrec(&block)
  @on_pubrec = block if block_given?
  @on_pubrec
end
on_pubrec=(callback) click to toggle source
# File lib/paho_mqtt/handler.rb, line 236
def on_pubrec=(callback)
  @on_pubrec = callback if callback.is_a?(Proc)
end
on_pubrel(&block) click to toggle source
# File lib/paho_mqtt/handler.rb, line 205
def on_pubrel(&block)
  @on_pubrel = block if block_given?
  @on_pubrel
end
on_pubrel=(callback) click to toggle source
# File lib/paho_mqtt/handler.rb, line 240
def on_pubrel=(callback)
  @on_pubrel = callback if callback.is_a?(Proc)
end
on_suback(&block) click to toggle source
# File lib/paho_mqtt/handler.rb, line 185
def on_suback(&block)
  @on_suback = block if block_given?
  @on_suback
end
on_suback=(callback) click to toggle source
# File lib/paho_mqtt/handler.rb, line 224
def on_suback=(callback)
  @on_suback = callback if callback.is_a?(Proc)
end
on_unsuback(&block) click to toggle source
# File lib/paho_mqtt/handler.rb, line 190
def on_unsuback(&block)
  @on_unsuback = block if block_given?
  @on_unsuback
end
on_unsuback=(callback) click to toggle source
# File lib/paho_mqtt/handler.rb, line 228
def on_unsuback=(callback)
  @on_unsuback = callback if callback.is_a?(Proc)
end
packet_type(packet) click to toggle source
# File lib/paho_mqtt/handler.rb, line 252
def packet_type(packet)
  type = packet.class
  if PahoMqtt::PACKET_TYPES[3..13].include?(type)
    type.to_s.split('::').last.downcase
  else
    PahoMqtt.logger.error("Received an unexpeceted packet: #{packet}.") if PahoMqtt.logger?
    raise PacketException.new('Invalid packet type id')
  end
end
receive_packet() click to toggle source
# File lib/paho_mqtt/handler.rb, line 38
def receive_packet
  result = IO.select([@socket], nil, nil, SELECT_TIMEOUT) unless @socket.nil? || @socket.closed?
  unless result.nil?
    packet = PahoMqtt::Packet::Base.read(@socket)
    unless packet.nil?
      if packet.is_a?(PahoMqtt::Packet::Connack)
        @last_ping_resp = Time.now
        return handle_connack(packet)
      else
        handle_packet(packet)
        @last_ping_resp = Time.now
      end
    end
  end
  return result
end
register_topic_callback(topic, callback, &block) click to toggle source
# File lib/paho_mqtt/handler.rb, line 61
def register_topic_callback(topic, callback, &block)
  if topic.nil?
    PahoMqtt.logger.error("The topics where the callback is trying to be registered have been found nil.") if PahoMqtt.logger?
    raise ArgumentError
  end
  clear_topic_callback(topic)
  if block_given?
    @registered_callback.push([topic, block])
  elsif !(callback.nil?) && callback.is_a?(Proc)
    @registered_callback.push([topic, callback])
  end
  MQTT_ERR_SUCCESS
end
socket=(socket) click to toggle source
# File lib/paho_mqtt/handler.rb, line 34
def socket=(socket)
  @socket = socket
end