class MQTT::Client
Constants
- ATTR_DEFAULTS
Default attribute values
- PendingAck
Attributes
Number of seconds to wait for acknowledgement packets (default is 5 seconds)
Set the 'Clean Session' flag when connecting? (default is true)
Client
Identifier
Hostname of the remote server
Time (in seconds) between pings to remote server (default is 15 seconds)
Password to authenticate to the server with
Port number of the remote server
How long to wait between re-connection attempts (exponential - i.e. immediately after first drop, then 5s, then 25s, then 125s, etc. when theis value defaults to 5)
How many attempts to re-establish a connection after it drops before giving up (default 5)
How many times to attempt re-sending packets that weren't acknowledged (default is 5) before giving up
Set to true to enable SSL/TLS encrypted communication
Set to a symbol to use a specific variant of SSL/TLS. Allowed values include:
@example Using TLS 1.0
client = Client.new('mqtt.example.com', ssl: :TLSv1)
@see OpenSSL::SSL::SSLContext::METHODS
Username to authenticate to the server with
The version number of the MQTT
protocol to use (default 3.1.1)
Contents of message that is sent by server when client disconnect
The QoS level of the will message sent by the server
If the Will message should be retain by the server after it is sent
The topic that the Will message is published to
Public Class Methods
Create and connect a new MQTT
Client
Accepts the same arguments as creating a new client. If a block is given, then it will be executed before disconnecting again.
Example:
MQTT::Client.connect('myserver.example.com') do |client| # do stuff here end
# File lib/mqtt/client.rb, line 103 def self.connect(*args, &block) client = MQTT::Client.new(*args) client.connect(&block) client end
Generate a random client identifier (using the characters 0-9 and a-z)
# File lib/mqtt/client.rb, line 111 def self.generate_client_id(prefix = 'ruby', length = 16) "#{prefix}#{SecureRandom.alphanumeric(length).downcase}" end
Create a new MQTT
Client
instance
Accepts one of the following:
-
a URI that uses the
MQTT
scheme -
a hostname and port
-
a Hash containing attributes to be set on the new instance
If no arguments are given then the method will look for a URI in the MQTT_SERVER environment variable.
Examples:
client = MQTT::Client.new client = MQTT::Client.new('mqtt://myserver.example.com') client = MQTT::Client.new('mqtt://user:pass@myserver.example.com') client = MQTT::Client.new('myserver.example.com') client = MQTT::Client.new('myserver.example.com', 18830) client = MQTT::Client.new(host: 'myserver.example.com') client = MQTT::Client.new(host: 'myserver.example.com', keep_alive: 30)
# File lib/mqtt/client.rb, line 134 def initialize(host = nil, port = nil, **attributes) # Set server URI from environment if present if host.nil? && port.nil? && attributes.empty? && ENV['MQTT_SERVER'] attributes.merge!(parse_uri(ENV['MQTT_SERVER'])) end if host case host when URI, %r{^mqtts?://} attributes.merge!(parse_uri(host)) else attributes[:host] = host end end attributes[:port] = port unless port.nil? # Merge arguments with default values for attributes ATTR_DEFAULTS.merge(attributes).each_pair do |k, v| send("#{k}=", v) end # Set a default port number if @port.nil? @port = @ssl ? MQTT::DEFAULT_SSL_PORT : MQTT::DEFAULT_PORT end # Initialise private instance variables @socket = nil @read_queue = Queue.new @write_queue = Queue.new @read_thread = nil @write_thread = nil @acks = {} @connection_mutex = Mutex.new @acks_mutex = Mutex.new @wake_up_pipe = IO.pipe @connected = false end
Public Instance Methods
yields a block, and after the block returns all messages are published at once, waiting for any necessary PubAcks for QoS 1 packets as a batch at the end
For example: client.batch_publish do client.publish("topic1", "value1", qos: 1) client.publish("topic2", "value2", qos: 1) end
# File lib/mqtt/client.rb, line 326 def batch_publish return yield if @batch_publish @batch_publish = {} begin yield batch = @batch_publish @batch_publish = nil batch.each do |(kwargs, values)| publish(values, **kwargs) end ensure @batch_publish = nil end end
Set a path to a file containing a PEM-format CA certificate and enable peer verification
# File lib/mqtt/client.rb, line 206 def ca_file=(path) ssl_context.ca_file = path ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER unless path.nil? end
PEM-format client certificate
# File lib/mqtt/client.rb, line 189 def cert=(cert) ssl_context.cert = OpenSSL::X509::Certificate.new(cert) end
Set a path to a file containing a PEM-format client certificate
# File lib/mqtt/client.rb, line 184 def cert_file=(path) self.cert = File.read(path) end
Clear the incoming message queue.
# File lib/mqtt/client.rb, line 474 def clear_queue @read_queue.clear end
Connect to the MQTT
server
If a block is given, then yield to that block and then disconnect again.
# File lib/mqtt/client.rb, line 225 def connect if connected? yield(self) if block_given? return end if @client_id.nil? || @client_id.empty? raise 'Must provide a client_id if clean_session is set to false' unless @clean_session # Empty client id is not allowed for version 3.1.0 @client_id = MQTT::Client.generate_client_id if @version == '3.1.0' end raise ArgumentError, 'No MQTT server host set when attempting to connect' if @host.nil? connect_internal return unless block_given? # If a block is given, then yield and disconnect begin yield(self) ensure disconnect end end
Checks whether the client is connected to the server.
Note that this returns true even if the connection is down and we're trying to reconnect
# File lib/mqtt/client.rb, line 305 def connected? @connected end
Disconnect from the MQTT
server.
If you don't want to say goodbye to the server, set send_msg to false.
# File lib/mqtt/client.rb, line 265 def disconnect(send_msg: true) return unless connected? @read_queue << [ConnectionClosedException.new, current_time] # Stop reading packets from the socket first @connection_mutex.synchronize do if @write_thread&.alive? @write_thread.kill @write_thread.join end @read_thread.kill if @read_thread&.alive? @read_thread = @write_thread = nil @connected = false end @acks_mutex.synchronize do @acks.each_value do |pending_ack| pending_ack.queue << :close end @acks.clear end return unless @socket if send_msg packet = MQTT::Packet::Disconnect.new begin @socket.write(packet.to_s) rescue nil end end @socket.close @socket = nil end
wait until all messages have been sent
# File lib/mqtt/client.rb, line 253 def flush raise NotConnectedException unless connected? queue = Queue.new @write_queue << queue queue.pop nil end
Return the next message received from the MQTT
server.
The method either returns the Publish packet:
packet = client.get
Or can be used with a block to keep processing messages:
client.get do |packet| # Do stuff here end
# File lib/mqtt/client.rb, line 440 def get raise NotConnectedException unless connected? loop_start = current_time loop do packet = @read_queue.pop if packet.is_a?(Array) && packet.last >= loop_start e = packet.first e.set_backtrace((e.backtrace || []) + ['<from MQTT worker thread>'] + caller) raise e end next unless packet.is_a?(Packet) unless block_given? puback_packet(packet) if packet.qos > 0 return packet end yield packet puback_packet(packet) if packet.qos > 0 end end
Set to a PEM-format client private key
# File lib/mqtt/client.rb, line 200 def key=(*args) cert, passphrase = args.flatten ssl_context.key = OpenSSL::PKey::RSA.new(cert, passphrase) end
Set a path to a file containing a PEM-format client private key
# File lib/mqtt/client.rb, line 194 def key_file=(*args) path, passphrase = args.flatten ssl_context.key = OpenSSL::PKey::RSA.new(File.open(path), passphrase) end
registers a callback to be called when a connection is re-established
can be used to re-subscribe (if you're not using persistent sessions) to topics, and/or re-publish aliveness (if you set a Will)
# File lib/mqtt/client.rb, line 313 def on_reconnect(&block) @on_reconnect = block end
Publish a message on a particular topic to the MQTT
server.
# File lib/mqtt/client.rb, line 345 def publish(topics, payload = nil, retain: false, qos: 0) if topics.is_a?(Hash) && !payload.nil? raise ArgumentError, 'Payload cannot be passed if passing a hash for topics and payloads' end raise NotConnectedException unless connected? if @batch_publish && qos != 0 values = @batch_publish[{ retain: retain, qos: qos }] ||= {} if topics.is_a?(Hash) values.merge!(topics) else values[topics] = payload end return end pending_acks = [] topics = { topics => payload } unless topics.is_a?(Hash) topics.each do |(topic, topic_payload)| raise ArgumentError, 'Topic name cannot be nil' if topic.nil? raise ArgumentError, 'Topic name cannot be empty' if topic.empty? packet = MQTT::Packet::Publish.new( id: next_packet_id, qos: qos, retain: retain, topic: topic, payload: topic_payload ) pending_acks << register_for_ack(packet) unless qos.zero? # Send the packet send_packet(packet) end return if qos.zero? pending_acks.each do |ack| wait_for_ack(ack) end nil end
Returns true if the incoming message queue is empty.
# File lib/mqtt/client.rb, line 464 def queue_empty? @read_queue.empty? end
Returns the length of the incoming message queue.
# File lib/mqtt/client.rb, line 469 def queue_length @read_queue.length end
Set the Will for the client
The will is a message that will be delivered by the server when the client dies. The Will must be set before establishing a connection to the server
# File lib/mqtt/client.rb, line 215 def set_will(topic, payload, retain: false, qos: 0) self.will_topic = topic self.will_payload = payload self.will_retain = retain self.will_qos = qos end
Get the OpenSSL context, that is used if SSL/TLS is enabled
# File lib/mqtt/client.rb, line 179 def ssl_context @ssl_context ||= OpenSSL::SSL::SSLContext.new end
Send a subscribe message for one or more topics on the MQTT
server. The topics parameter should be one of the following:
-
String: subscribe to one topic with QoS 0
-
Array: subscribe to multiple topics with QoS 0
-
Hash: subscribe to multiple topics where the key is the topic and the value is the QoS level
For example:
client.subscribe( 'a/b' ) client.subscribe( 'a/b', 'c/d' ) client.subscribe( ['a/b',0], ['c/d',1] ) client.subscribe( { 'a/b' => 0, 'c/d' => 1 } )
# File lib/mqtt/client.rb, line 403 def subscribe(*topics, wait_for_ack: false) raise NotConnectedException unless connected? packet = MQTT::Packet::Subscribe.new( id: next_packet_id, topics: topics ) token = register_for_ack(packet) if wait_for_ack send_packet(packet) wait_for_ack(token) if wait_for_ack end
Send a unsubscribe message for one or more topics on the MQTT
server
# File lib/mqtt/client.rb, line 416 def unsubscribe(*topics, wait_for_ack: false) raise NotConnectedException unless connected? topics = topics.first if topics.is_a?(Enumerable) && topics.count == 1 packet = MQTT::Packet::Unsubscribe.new( topics: topics, id: next_packet_id ) token = register_for_ack(packet) if wait_for_ack send_packet(packet) wait_for_ack(token) if wait_for_ack end
Private Instance Methods
# File lib/mqtt/client.rb, line 482 def connect_internal # Create network socket tcp_socket = TCPSocket.new(@host, @port) if @ssl # Set the protocol version ssl_context.ssl_version = @ssl if @ssl.is_a?(Symbol) @socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context) @socket.sync_close = true # Set hostname on secure socket for Server Name Indication (SNI) @socket.hostname = @host if @socket.respond_to?(:hostname=) @socket.connect else @socket = tcp_socket end # Construct a connect packet packet = MQTT::Packet::Connect.new( version: @version, clean_session: @clean_session, keep_alive: @keep_alive, client_id: @client_id, username: @username, password: @password, will_topic: @will_topic, will_payload: @will_payload, will_qos: @will_qos, will_retain: @will_retain ) # Send packet @socket.write(packet.to_s) # Receive response receive_connack @connected = true @write_thread = Thread.new do while (packet = @write_queue.pop) # flush command if packet.is_a?(Queue) packet << :flushed next end @socket.write(packet.to_s) end rescue => e @write_queue << packet if packet reconnect(e) end @read_thread = Thread.new do receive_packet while connected? end end
# File lib/mqtt/client.rb, line 687 def current_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end
# File lib/mqtt/client.rb, line 703 def handle_keep_alives return unless @keep_alive && @keep_alive > 0 current_time_local = current_time if current_time_local >= @last_packet_received_at + @keep_alive && !@keep_alive_sent packet = MQTT::Packet::Pingreq.new send_packet(packet) @keep_alive_sent = true elsif current_time_local >= @last_packet_received_at + @keep_alive + @ack_timeout raise KeepAliveTimeout end end
# File lib/mqtt/client.rb, line 638 def handle_packet(packet) @last_packet_received_at = current_time @keep_alive_sent = false case packet when MQTT::Packet::Publish # Add to queue @read_queue.push(packet) when MQTT::Packet::Pingresp # do nothing; setting @last_packet_received_at already handled it when MQTT::Packet::Puback, MQTT::Packet::Suback, MQTT::Packet::Unsuback @acks_mutex.synchronize do pending_ack = @acks[packet.id] if pending_ack @acks.delete(packet.id) pending_ack.queue << packet end end end # Ignore all other packets # FIXME: implement responses for QoS 2 end
# File lib/mqtt/client.rb, line 662 def handle_timeouts @acks_mutex.synchronize do current_time_local = current_time @acks.each_value do |pending_ack| break unless pending_ack.timeout_at <= current_time_local resend(pending_ack) end end end
# File lib/mqtt/client.rb, line 766 def next_packet_id @last_packet_id = (@last_packet_id || 0).next @last_packet_id = 1 if @last_packet_id > 0xffff @last_packet_id end
# File lib/mqtt/client.rb, line 691 def next_timeout timeout_from_acks = @acks_mutex.synchronize do @acks.first&.last&.timeout_at end return nil if timeout_from_acks.nil? && @keep_alive.nil? next_ping = @last_packet_received_at + @keep_alive if @keep_alive && !@keep_alive_sent next_ping = @last_packet_received_at + @keep_alive + @ack_timeout if @keep_alive && @keep_alive_sent current_time_local = current_time [([timeout_from_acks, next_ping].compact.min || current_time_local) - current_time_local, 0].max end
# File lib/mqtt/client.rb, line 746 def parse_uri(uri) uri = URI.parse(uri) unless uri.is_a?(URI) ssl = case uri.scheme when 'mqtt' false when 'mqtts' true else raise 'Only the mqtt:// and mqtts:// schemes are supported' end { host: uri.host, port: uri.port || nil, username: uri.user ? URI::Parser.new.unescape(uri.user) : nil, password: uri.password ? URI::Parser.new.unescape(uri.password) : nil, ssl: ssl } end
# File lib/mqtt/client.rb, line 716 def puback_packet(packet) send_packet(MQTT::Packet::Puback.new(id: packet.id)) end
Read and check a connection acknowledgement packet
# File lib/mqtt/client.rb, line 721 def receive_connack Timeout.timeout(@ack_timeout) do packet = MQTT::Packet.read(@socket) if packet.class != MQTT::Packet::Connack raise MQTT::ProtocolException, "Response wasn't a connection acknowledgement: #{packet.class}" end # Check the return code if packet.return_code != 0x00 # 3.2.2.3 If a server sends a CONNACK packet containing a non-zero # return code it MUST then close the Network Connection @socket.close raise MQTT::ProtocolException, packet.return_msg end @last_packet_received_at = current_time @keep_alive_sent = false @connack = packet end end
Try to read a packet from the server Also sends keep-alive ping packets.
# File lib/mqtt/client.rb, line 594 def receive_packet # Poll socket - is there data waiting? timeout = next_timeout read_ready, = IO.select([@socket, @wake_up_pipe[0]], [], [], timeout) # we just needed to break out of our select to set up a new timeout; # we can discard the actual contents @wake_up_pipe[0].readpartial(4096) if read_ready&.include?(@wake_up_pipe[0]) handle_timeouts if read_ready&.include?(@socket) packet = MQTT::Packet.read(@socket) handle_packet(packet) end handle_keep_alives rescue => e reconnect(e) end
# File lib/mqtt/client.rb, line 542 def reconnect(exception) should_exit = nil @connection_mutex.synchronize do @socket&.close @socket = nil @read_thread&.kill if Thread.current != @read_thread @write_thread&.kill if Thread.current != @write_thread should_exit = Thread.current == @read_thread @read_thread = @write_thread = nil retries = 0 begin connect_internal unless @reconnect_limit == 0 rescue @socket&.close @socket = nil if (retries += 1) < @reconnect_limit sleep @reconnect_backoff ** retries retry end end unless @socket # couldn't reconnect @acks_mutex.synchronize do @acks.each_value do |pending_ack| pending_ack.queue << :close end @acks.clear end @connected = false @read_queue << [exception, current_time] return end end begin if @on_reconnect&.arity == 0 @on_reconnect.call else @on_reconnect&.call(@connack) end rescue => e @read_queue << [e, current_time] disconnect end Thread.exit if should_exit end
# File lib/mqtt/client.rb, line 615 def register_for_ack(packet) queue = Queue.new timeout_at = current_time + @ack_timeout @acks_mutex.synchronize do if @acks.empty? # just need to wake up the read thread to set up the timeout for this packet @wake_up_pipe[1].write('z') end @acks[packet.id] = PendingAck.new(packet, queue, timeout_at, 1) end end
# File lib/mqtt/client.rb, line 673 def resend(pending_ack) packet = pending_ack.packet if (pending_ack.send_count += 1) > @resend_limit @acks.delete(packet.id) pending_ack.queue << :resend_limit_exceeded return end # timed out, or simple re-send @wake_up_pipe[1].write('z') if @acks.first.first == packet.id pending_ack.timeout_at = current_time + @ack_timeout packet.duplicate = true send_packet(packet) end
Send a packet to server
# File lib/mqtt/client.rb, line 742 def send_packet(packet) @write_queue << packet end
# File lib/mqtt/client.rb, line 628 def wait_for_ack(pending_ack) response = pending_ack.queue.pop case response when :close raise ConnectionClosedException when :resend_limit_exceeded raise ResendLimitExceededException end end