class PahoMqtt::Client

Attributes

ack_timeout[RW]
blocking[RW]
clean_session[RW]
client_id[RW]
connection_state[R]

Read Only attribute

host[RW]

Connection related attributes:

keep_alive[RW]

Timeout attributes:

mqtt_version[RW]
password[RW]
persistent[RW]
port[RW]
reconnect_delay[RW]
reconnect_limit[RW]
ssl[RW]
ssl_context[R]
username[RW]
will_payload[RW]
will_qos[RW]
will_retain[RW]
will_topic[RW]

Last will attributes:

Public Class Methods

new(*args) click to toggle source
# File lib/paho_mqtt/client.rb, line 52
def initialize(*args)
  @last_ping_resp         = Time.now
  @last_packet_id         = 0
  @ssl_context            = nil
  @sender                 = nil
  @handler                = Handler.new
  @connection_helper      = nil
  @connection_state       = MQTT_CS_DISCONNECT
  @connection_state_mutex = Mutex.new
  @mqtt_thread            = nil
  @reconnect_thread       = nil
  @id_mutex               = Mutex.new
  @reconnect_limit        = 3
  @reconnect_delay        = 5

  if args.last.is_a?(Hash)
    attr = args.pop
  else
    attr = {}
  end

  CLIENT_ATTR_DEFAULTS.merge(attr).each_pair do |k,v|
    self.send("#{k}=", v)
  end

  if @ssl
    @ssl_context = OpenSSL::SSL::SSLContext.new
  end

  if @port.nil?
    if @ssl
      @port = DEFAULT_SSL_PORT
    else
      @port = DEFAULT_PORT
    end
  end

  if  @client_id.nil? || @client_id == ""
    @client_id = generate_client_id
  end
end

Public Instance Methods

add_topic_callback(topic, callback=nil, &block) click to toggle source
# File lib/paho_mqtt/client.rb, line 269
def add_topic_callback(topic, callback=nil, &block)
  @handler.register_topic_callback(topic, callback, &block)
end
config_ssl_context(cert_path, key_path, ca_path=nil) click to toggle source
# File lib/paho_mqtt/client.rb, line 99
def config_ssl_context(cert_path, key_path, ca_path=nil)
  @ssl ||= true
  @ssl_context = SSLHelper.config_ssl_context(cert_path, key_path, ca_path)
end
connect(host=@host, port=@port, keep_alive=@keep_alive, persistent=@persistent, blocking=@blocking) click to toggle source
# File lib/paho_mqtt/client.rb, line 104
def connect(host=@host, port=@port, keep_alive=@keep_alive, persistent=@persistent, blocking=@blocking)
  @persistent = persistent
  @blocking   = blocking
  @host       = host
  @port       = port.to_i
  @keep_alive = keep_alive
  @connection_state_mutex.synchronize do
    @connection_state = MQTT_CS_NEW
  end
  @mqtt_thread.kill unless @mqtt_thread.nil?

  init_connection unless reconnect?
  @connection_helper.send_connect(session_params)
  begin
    init_pubsub
    @connection_state = @connection_helper.do_connect(reconnect?)
    if connected?
      build_pubsub
      daemon_mode unless @blocking
    end
  rescue LowVersionException
    downgrade_version
  end
end
connected?() click to toggle source
# File lib/paho_mqtt/client.rb, line 146
def connected?
  @connection_state == MQTT_CS_CONNECTED
end
daemon_mode() click to toggle source
# File lib/paho_mqtt/client.rb, line 129
def daemon_mode
  @mqtt_thread = Thread.new do
    @reconnect_thread.kill unless @reconnect_thread.nil? || !@reconnect_thread.alive?
    begin
      while connected? do
        mqtt_loop
      end
    rescue SystemCallError => e
      if @persistent
        reconnect
      else
        raise e
      end
    end
  end
end
disconnect(explicit=true) click to toggle source
# File lib/paho_mqtt/client.rb, line 218
def disconnect(explicit=true)
  @connection_helper.do_disconnect(@publisher, explicit, @mqtt_thread)
  @connection_state_mutex.synchronize do
    @connection_state = MQTT_CS_DISCONNECT
  end
  if explicit && @clean_session
    @last_packet_id = 0
    @subscriber.clear_queue
  end
  MQTT_ERR_SUCCESS
end
generate_client_id(prefix='paho_ruby', lenght=16) click to toggle source
# File lib/paho_mqtt/client.rb, line 94
def generate_client_id(prefix='paho_ruby', lenght=16)
  charset = Array('A'..'Z') + Array('a'..'z') + Array('0'..'9')
  @client_id = prefix << Array.new(lenght) { charset.sample }.join
end
loop_misc() click to toggle source
# File lib/paho_mqtt/client.rb, line 189
def loop_misc
  if @connection_helper.check_keep_alive(@persistent, @handler.last_ping_resp, @keep_alive) == MQTT_CS_DISCONNECT
    reconnect if check_persistence
  end
  @publisher.check_waiting_publisher
  @subscriber.check_waiting_subscriber
  sleep SELECT_TIMEOUT
end
loop_read() click to toggle source
# File lib/paho_mqtt/client.rb, line 166
def loop_read
  begin
    MAX_QUEUE.times do
      result = @handler.receive_packet
      break if result.nil?
    end
  rescue FullQueueException
    PahoMqtt.logger.warn("Early exit in reading loop. The maximum packets have been reach for #{packet.type_name}") if PahoMqtt.logger?
  rescue ReadingException
    if check_persistence
      reconnect
    else
      raise ReadingException
    end
  end
end
loop_write() click to toggle source
# File lib/paho_mqtt/client.rb, line 154
def loop_write
  begin
    @sender.writing_loop
  rescue WritingException
    if check_persistence
      reconnect
    else
      raise WritingException
    end
  end
end
mqtt_loop() click to toggle source
# File lib/paho_mqtt/client.rb, line 183
def mqtt_loop
  loop_read
  loop_write
  loop_misc
end
on_connack(&block) click to toggle source
# File lib/paho_mqtt/client.rb, line 277
def on_connack(&block)
  @handler.on_connack = block if block_given?
  @handler.on_connack
end
on_connack=(callback) click to toggle source
# File lib/paho_mqtt/client.rb, line 317
def on_connack=(callback)
  @handler.on_connack = callback if callback.is_a?(Proc)
end
on_message(&block) click to toggle source
# File lib/paho_mqtt/client.rb, line 312
def on_message(&block)
  @handler.on_message = block if block_given?
  @handler.on_message
end
on_message=(callback) click to toggle source
# File lib/paho_mqtt/client.rb, line 345
def on_message=(callback)
  @handler.on_message = callback if callback.is_a?(Proc)
end
on_puback(&block) click to toggle source
# File lib/paho_mqtt/client.rb, line 292
def on_puback(&block)
  @handler.on_puback = block if block_given?
  @handler.on_puback
end
on_puback=(callback) click to toggle source
# File lib/paho_mqtt/client.rb, line 329
def on_puback=(callback)
  @handler.on_puback = callback if callback.is_a?(Proc)
end
on_pubcomp(&block) click to toggle source
# File lib/paho_mqtt/client.rb, line 307
def on_pubcomp(&block)
  @handler.on_pubcomp = block if block_given?
  @handler.on_pubcomp
end
on_pubcomp=(callback) click to toggle source
# File lib/paho_mqtt/client.rb, line 341
def on_pubcomp=(callback)
  @handler.on_pubcomp = callback if callback.is_a?(Proc)
end
on_pubrec(&block) click to toggle source
# File lib/paho_mqtt/client.rb, line 297
def on_pubrec(&block)
  @handler.on_pubrec = block if block_given?
  @handler.on_pubrec
end
on_pubrec=(callback) click to toggle source
# File lib/paho_mqtt/client.rb, line 333
def on_pubrec=(callback)
  @handler.on_pubrec = callback if callback.is_a?(Proc)
end
on_pubrel(&block) click to toggle source
# File lib/paho_mqtt/client.rb, line 302
def on_pubrel(&block)
  @handler.on_pubrel = block if block_given?
  @handler.on_pubrel
end
on_pubrel=(callback) click to toggle source
# File lib/paho_mqtt/client.rb, line 337
def on_pubrel=(callback)
  @handler.on_pubrel = callback if callback.is_a?(Proc)
end
on_suback(&block) click to toggle source
# File lib/paho_mqtt/client.rb, line 282
def on_suback(&block)
  @handler.on_suback = block if block_given?
  @handler.on_suback
end
on_suback=(callback) click to toggle source
# File lib/paho_mqtt/client.rb, line 321
def on_suback=(callback)
  @handler.on_suback = callback if callback.is_a?(Proc)
end
on_unsuback(&block) click to toggle source
# File lib/paho_mqtt/client.rb, line 287
def on_unsuback(&block)
  @handler.on_unsuback = block if block_given?
  @handler.on_unsuback
end
on_unsuback=(callback) click to toggle source
# File lib/paho_mqtt/client.rb, line 325
def on_unsuback=(callback)
  @handler.on_unsuback = callback if callback.is_a?(Proc)
end
ping_host() click to toggle source
# File lib/paho_mqtt/client.rb, line 265
def ping_host
  @sender.send_pingreq
end
publish(topic, payload="", retain=false, qos=0) click to toggle source
# File lib/paho_mqtt/client.rb, line 230
def publish(topic, payload="", retain=false, qos=0)
  if topic == "" || !topic.is_a?(String)
    PahoMqtt.logger.error("Publish topics is invalid, not a string or empty.") if PahoMqtt.logger?
    raise ArgumentError
  end
  id = next_packet_id
  @publisher.send_publish(topic, payload, retain, qos, id)
end
reconnect() click to toggle source
# File lib/paho_mqtt/client.rb, line 198
def reconnect
  @reconnect_thread = Thread.new do
    counter = 0
    while (@reconnect_limit >= counter || @reconnect_limit == -1) do
      counter += 1
      PahoMqtt.logger.debug("New reconnect attempt...") if PahoMqtt.logger?
      connect
      if connected?
        break
      else
        sleep @reconnect_delay
      end
    end
    unless connected?
      PahoMqtt.logger.error("Reconnection attempt counter is over. (#{@reconnect_limit} times)") if PahoMqtt.logger?
      disconnect(false)
    end
  end
end
reconnect?() click to toggle source
# File lib/paho_mqtt/client.rb, line 150
def reconnect?
  Thread.current == @reconnect_thread
end
registered_callback() click to toggle source
# File lib/paho_mqtt/client.rb, line 349
def registered_callback
  @handler.registered_callback
end
remove_topic_callback(topic) click to toggle source
# File lib/paho_mqtt/client.rb, line 273
def remove_topic_callback(topic)
  @handler.clear_topic_callback(topic)
end
subscribe(*topics) click to toggle source
# File lib/paho_mqtt/client.rb, line 239
def subscribe(*topics)
  begin
    id = next_packet_id
    unless @subscriber.send_subscribe(topics, id) == PahoMqtt::MQTT_ERR_SUCCESS
      reconnect if check_persistence
    end
    MQTT_ERR_SUCCESS
  rescue ProtocolViolation
    PahoMqtt.logger.error("Subscribe topics need one topic or a list of topics.") if PahoMqtt.logger?
    raise ProtocolViolation
  end
end
subscribed_topics() click to toggle source
# File lib/paho_mqtt/client.rb, line 353
def subscribed_topics
  @subscriber.subscribed_topics
end
unsubscribe(*topics) click to toggle source
# File lib/paho_mqtt/client.rb, line 252
def unsubscribe(*topics)
  begin
    id = next_packet_id
    unless @subscriber.send_unsubscribe(topics, id) == MQTT_ERR_SUCCESS
      reconnect if check_persistence
    end
    MQTT_ERR_SUCCESS
  rescue ProtocolViolation
    PahoMqtt.logger.error("Unsubscribe need at least one topic.") if PahoMqtt.logger?
    raise ProtocolViolation
  end
end

Private Instance Methods

build_pubsub() click to toggle source
# File lib/paho_mqtt/client.rb, line 383
def build_pubsub
  @subscriber.config_subscription(next_packet_id)
  @sender.flush_waiting_packet
  @publisher.config_all_message_queue
end
check_persistence() click to toggle source
# File lib/paho_mqtt/client.rb, line 410
def check_persistence
  disconnect(false)
  @persistent
end
downgrade_version() click to toggle source
# File lib/paho_mqtt/client.rb, line 367
def downgrade_version
  PahoMqtt.logger.debug("Connection refused: unacceptable protocol version #{@mqtt_version}, trying 3.1") if PahoMqtt.logger?
  if @mqtt_version != "3.1"
    @mqtt_version = "3.1"
    connect(@host, @port, @keep_alive)
  else
    raise ProtocolVersionException.new("Unsupported MQTT version")
  end
end
init_connection() click to toggle source
# File lib/paho_mqtt/client.rb, line 389
def init_connection
  @connection_helper         = ConnectionHelper.new(@host, @port, @ssl, @ssl_context, @ack_timeout)
  @connection_helper.handler = @handler
  @sender                    = @connection_helper.sender
end
init_pubsub() click to toggle source
# File lib/paho_mqtt/client.rb, line 377
def init_pubsub
  @subscriber.nil? ? @subscriber = Subscriber.new(@sender) : @subscriber.sender = @sender
  @publisher.nil? ? @publisher = Publisher.new(@sender) : @publisher.sender = @sender
  @handler.config_pubsub(@publisher, @subscriber)
end
next_packet_id() click to toggle source
# File lib/paho_mqtt/client.rb, line 360
def next_packet_id
  @id_mutex.synchronize do
    @last_packet_id = 0 if @last_packet_id >= MAX_PACKET_ID
    @last_packet_id = @last_packet_id.next
  end
end
session_params() click to toggle source
# File lib/paho_mqtt/client.rb, line 395
def session_params
  {
    :version       => @mqtt_version,
    :clean_session => @clean_session,
    :keep_alive    => @keep_alive,
    :client_id     => @client_id,
    :username      => @username,
    :password      => @password,
    :will_topic    => @will_topic,
    :will_payload  => @will_payload,
    :will_qos      => @will_qos,
    :will_retain   => @will_retain
  }
end