class PahoMqtt::Client
Attributes
ack_timeout[RW]
blocking[RW]
clean_session[RW]
client_id[RW]
connection_state[R]
Read Only attribute
host[RW]
Connection related attributes:
keep_alive[RW]
Timeout attributes:
mqtt_version[RW]
password[RW]
persistent[RW]
port[RW]
reconnect_delay[RW]
reconnect_limit[RW]
ssl[RW]
ssl_context[R]
username[RW]
will_payload[RW]
will_qos[RW]
will_retain[RW]
will_topic[RW]
Last will attributes:
Public Class Methods
new(*args)
click to toggle source
# File lib/paho_mqtt/client.rb, line 52 def initialize(*args) @last_ping_resp = Time.now @last_packet_id = 0 @ssl_context = nil @sender = nil @handler = Handler.new @connection_helper = nil @connection_state = MQTT_CS_DISCONNECT @connection_state_mutex = Mutex.new @mqtt_thread = nil @reconnect_thread = nil @id_mutex = Mutex.new @reconnect_limit = 3 @reconnect_delay = 5 if args.last.is_a?(Hash) attr = args.pop else attr = {} end CLIENT_ATTR_DEFAULTS.merge(attr).each_pair do |k,v| self.send("#{k}=", v) end if @ssl @ssl_context = OpenSSL::SSL::SSLContext.new end if @port.nil? if @ssl @port = DEFAULT_SSL_PORT else @port = DEFAULT_PORT end end if @client_id.nil? || @client_id == "" @client_id = generate_client_id end end
Public Instance Methods
add_topic_callback(topic, callback=nil, &block)
click to toggle source
# File lib/paho_mqtt/client.rb, line 264 def add_topic_callback(topic, callback=nil, &block) @handler.register_topic_callback(topic, callback, &block) end
config_ssl_context(cert_path, key_path, ca_path=nil)
click to toggle source
# File lib/paho_mqtt/client.rb, line 99 def config_ssl_context(cert_path, key_path, ca_path=nil) @ssl ||= true @ssl_context = SSLHelper.config_ssl_context(cert_path, key_path, ca_path) end
connect(host=@host, port=@port, keep_alive=@keep_alive, persistent=@persistent, blocking=@blocking)
click to toggle source
# File lib/paho_mqtt/client.rb, line 104 def connect(host=@host, port=@port, keep_alive=@keep_alive, persistent=@persistent, blocking=@blocking) @persistent = persistent @blocking = blocking @host = host @port = port.to_i @keep_alive = keep_alive @connection_state_mutex.synchronize do @connection_state = MQTT_CS_NEW end @mqtt_thread.kill unless @mqtt_thread.nil? init_connection @connection_helper.send_connect(session_params) begin @connection_state = @connection_helper.do_connect(reconnect?) if connected? build_pubsub daemon_mode unless @blocking end rescue LowVersionException downgrade_version end end
connected?()
click to toggle source
# File lib/paho_mqtt/client.rb, line 145 def connected? @connection_state == MQTT_CS_CONNECTED end
daemon_mode()
click to toggle source
# File lib/paho_mqtt/client.rb, line 128 def daemon_mode @mqtt_thread = Thread.new do @reconnect_thread.kill unless @reconnect_thread.nil? || !@reconnect_thread.alive? begin while connected? do mqtt_loop end rescue SystemCallError => e if @persistent reconnect else raise e end end end end
disconnect(explicit=true)
click to toggle source
# File lib/paho_mqtt/client.rb, line 214 def disconnect(explicit=true) @last_packet_id = 0 if explicit @connection_helper.do_disconnect(@publisher, explicit, @mqtt_thread) @connection_state_mutex.synchronize do @connection_state = MQTT_CS_DISCONNECT end MQTT_ERR_SUCCESS end
generate_client_id(prefix='paho_ruby', lenght=16)
click to toggle source
# File lib/paho_mqtt/client.rb, line 94 def generate_client_id(prefix='paho_ruby', lenght=16) charset = Array('A'..'Z') + Array('a'..'z') + Array('0'..'9') @client_id = prefix << Array.new(lenght) { charset.sample }.join end
loop_misc()
click to toggle source
# File lib/paho_mqtt/client.rb, line 186 def loop_misc if @connection_helper.check_keep_alive(@persistent, @handler.last_ping_resp, @keep_alive) == MQTT_CS_DISCONNECT reconnect if check_persistence end @publisher.check_waiting_publisher @subscriber.check_waiting_subscriber end
loop_read(max_packet=MAX_READ)
click to toggle source
# File lib/paho_mqtt/client.rb, line 165 def loop_read(max_packet=MAX_READ) max_packet.times do begin @handler.receive_packet rescue ReadingException if check_persistence reconnect else raise ReadingException end end end end
loop_write(max_packet=MAX_WRITING)
click to toggle source
# File lib/paho_mqtt/client.rb, line 153 def loop_write(max_packet=MAX_WRITING) begin @sender.writing_loop(max_packet) rescue WritingException if check_persistence reconnect else raise WritingException end end end
mqtt_loop()
click to toggle source
# File lib/paho_mqtt/client.rb, line 179 def mqtt_loop loop_read loop_write loop_misc sleep LOOP_TEMPO end
on_connack(&block)
click to toggle source
# File lib/paho_mqtt/client.rb, line 272 def on_connack(&block) @handler.on_connack = block if block_given? @handler.on_connack end
on_connack=(callback)
click to toggle source
# File lib/paho_mqtt/client.rb, line 312 def on_connack=(callback) @handler.on_connack = callback if callback.is_a?(Proc) end
on_message(&block)
click to toggle source
# File lib/paho_mqtt/client.rb, line 307 def on_message(&block) @handler.on_message = block if block_given? @handler.on_message end
on_message=(callback)
click to toggle source
# File lib/paho_mqtt/client.rb, line 340 def on_message=(callback) @handler.on_message = callback if callback.is_a?(Proc) end
on_puback(&block)
click to toggle source
# File lib/paho_mqtt/client.rb, line 287 def on_puback(&block) @handler.on_puback = block if block_given? @handler.on_puback end
on_puback=(callback)
click to toggle source
# File lib/paho_mqtt/client.rb, line 324 def on_puback=(callback) @handler.on_puback = callback if callback.is_a?(Proc) end
on_pubcomp(&block)
click to toggle source
# File lib/paho_mqtt/client.rb, line 302 def on_pubcomp(&block) @handler.on_pubcomp = block if block_given? @handler.on_pubcomp end
on_pubcomp=(callback)
click to toggle source
# File lib/paho_mqtt/client.rb, line 336 def on_pubcomp=(callback) @handler.on_pubcomp = callback if callback.is_a?(Proc) end
on_pubrec(&block)
click to toggle source
# File lib/paho_mqtt/client.rb, line 292 def on_pubrec(&block) @handler.on_pubrec = block if block_given? @handler.on_pubrec end
on_pubrec=(callback)
click to toggle source
# File lib/paho_mqtt/client.rb, line 328 def on_pubrec=(callback) @handler.on_pubrec = callback if callback.is_a?(Proc) end
on_pubrel(&block)
click to toggle source
# File lib/paho_mqtt/client.rb, line 297 def on_pubrel(&block) @handler.on_pubrel = block if block_given? @handler.on_pubrel end
on_pubrel=(callback)
click to toggle source
# File lib/paho_mqtt/client.rb, line 332 def on_pubrel=(callback) @handler.on_pubrel = callback if callback.is_a?(Proc) end
on_suback(&block)
click to toggle source
# File lib/paho_mqtt/client.rb, line 277 def on_suback(&block) @handler.on_suback = block if block_given? @handler.on_suback end
on_suback=(callback)
click to toggle source
# File lib/paho_mqtt/client.rb, line 316 def on_suback=(callback) @handler.on_suback = callback if callback.is_a?(Proc) end
on_unsuback(&block)
click to toggle source
# File lib/paho_mqtt/client.rb, line 282 def on_unsuback(&block) @handler.on_unsuback = block if block_given? @handler.on_unsuback end
on_unsuback=(callback)
click to toggle source
# File lib/paho_mqtt/client.rb, line 320 def on_unsuback=(callback) @handler.on_unsuback = callback if callback.is_a?(Proc) end
ping_host()
click to toggle source
# File lib/paho_mqtt/client.rb, line 260 def ping_host @sender.send_pingreq end
publish(topic, payload="", retain=false, qos=0)
click to toggle source
# File lib/paho_mqtt/client.rb, line 223 def publish(topic, payload="", retain=false, qos=0) if topic == "" || !topic.is_a?(String) PahoMqtt.logger.error("Publish topics is invalid, not a string or empty.") if PahoMqtt.logger? raise ArgumentError end id = next_packet_id @publisher.send_publish(topic, payload, retain, qos, id) end
reconnect()
click to toggle source
# File lib/paho_mqtt/client.rb, line 194 def reconnect @reconnect_thread = Thread.new do counter = 0 while (@reconnect_limit >= counter || @reconnect_limit == -1) do counter += 1 PahoMqtt.logger.debug("New reconnect attempt...") if PahoMqtt.logger? connect if connected? break else sleep @reconnect_delay end end unless connected? PahoMqtt.logger.error("Reconnection attempt counter is over. (#{@reconnect_limit} times)") if PahoMqtt.logger? disconnect(false) end end end
reconnect?()
click to toggle source
# File lib/paho_mqtt/client.rb, line 149 def reconnect? Thread.current == @reconnect_thread end
registered_callback()
click to toggle source
# File lib/paho_mqtt/client.rb, line 344 def registered_callback @handler.registered_callback end
remove_topic_callback(topic)
click to toggle source
# File lib/paho_mqtt/client.rb, line 268 def remove_topic_callback(topic) @handler.clear_topic_callback(topic) end
subscribe(*topics)
click to toggle source
# File lib/paho_mqtt/client.rb, line 232 def subscribe(*topics) begin id = next_packet_id unless @subscriber.send_subscribe(topics, id) == PahoMqtt::MQTT_ERR_SUCCESS reconnect if check_persistence end MQTT_ERR_SUCCESS rescue ProtocolViolation PahoMqtt.logger.error("Subscribe topics need one topic or a list of topics.") if PahoMqtt.logger? disconnect(false) raise ProtocolViolation end end
subscribed_topics()
click to toggle source
# File lib/paho_mqtt/client.rb, line 348 def subscribed_topics @subscriber.subscribed_topics end
unsubscribe(*topics)
click to toggle source
# File lib/paho_mqtt/client.rb, line 246 def unsubscribe(*topics) begin id = next_packet_id unless @subscriber.send_unsubscribe(topics, id) == MQTT_ERR_SUCCESS reconnect if check_persistence end MQTT_ERR_SUCCESS rescue ProtocolViolation PahoMqtt.logger.error("Unsubscribe need at least one topic.") if PahoMqtt.logger? disconnect(false) raise ProtocolViolation end end
Private Instance Methods
build_pubsub()
click to toggle source
# File lib/paho_mqtt/client.rb, line 371 def build_pubsub if @subscriber.nil? @subscriber = Subscriber.new(@sender) else @subscriber.sender = @sender @subscriber.config_subscription(next_packet_id) end if @publisher.nil? @publisher = Publisher.new(@sender) else @publisher.sender = @sender @sender.flush_waiting_packet @publisher.config_all_message_queue end @handler.config_pubsub(@publisher, @subscriber) end
check_persistence()
click to toggle source
# File lib/paho_mqtt/client.rb, line 412 def check_persistence disconnect(false) @persistent end
downgrade_version()
click to toggle source
# File lib/paho_mqtt/client.rb, line 361 def downgrade_version PahoMqtt.logger.debug("Connection refused: unacceptable protocol version #{@mqtt_version}, trying 3.1") if PahoMqtt.logger? if @mqtt_version != "3.1" @mqtt_version = "3.1" connect(@host, @port, @keep_alive) else raise ProtocolVersionException.new("Unsupported MQTT version") end end
init_connection()
click to toggle source
# File lib/paho_mqtt/client.rb, line 388 def init_connection unless reconnect? @connection_helper = ConnectionHelper.new(@host, @port, @ssl, @ssl_context, @ack_timeout) @connection_helper.handler = @handler @sender = @connection_helper.sender end @connection_helper.setup_connection end
next_packet_id()
click to toggle source
# File lib/paho_mqtt/client.rb, line 355 def next_packet_id @id_mutex.synchronize do @last_packet_id = (@last_packet_id || 0).next end end
session_params()
click to toggle source
# File lib/paho_mqtt/client.rb, line 397 def session_params { :version => @mqtt_version, :clean_session => @clean_session, :keep_alive => @keep_alive, :client_id => @client_id, :username => @username, :password => @password, :will_topic => @will_topic, :will_payload => @will_payload, :will_qos => @will_qos, :will_retain => @will_retain } end