class EventMachine::MQTT::ClientConnection
Attributes
Public Class Methods
Connect to an MQTT server
Examples:
ClientConnection.connect('localhost', 1883) ClientConnection.connect(:host => 'localhost', :username => 'user', :password => 'pass')
# File lib/em/mqtt/client_connection.rb, line 20 def self.connect(*args, &blk) hash = { :host => 'localhost', :port => MQTT::DEFAULT_PORT } i = 0 args.each do |arg| if arg.is_a?(Hash) hash.merge!(arg) else if i == 0 hash[:host] = arg elsif i == 1 hash[:port] = arg end i += 1 end end ::EventMachine.connect( hash.delete(:host), hash.delete(:port), self, hash, &blk ) end
Initialize connection @param args [Hash] Arguments for connection @option args [String] :client_id A unique identifier for this client @option args [Integer] :keep_alive How often to send keep-alive pings (in seconds) @option args [Boolean] :clean_session Start a clean session with server or resume old one (default true) @option args [String] :username Username to authenticate with the server @option args [String] :password Password to authenticate with the server
# File lib/em/mqtt/client_connection.rb, line 50 def initialize(args={}) @client_id = MQTT::Client.generate_client_id @keep_alive = 10 @clean_session = true @packet_id = 0 @ack_timeout = 5 @username = nil @password = nil @timer = nil if args.is_a?(Hash) args.each_pair do |k,v| instance_variable_set("@#{k}", v) end end end
Public Instance Methods
# File lib/em/mqtt/client_connection.rb, line 72 def connection_completed # TCP socket established: send Connect packet packet = MQTT::Packet::Connect.new( :client_id => @client_id, :clean_session => @clean_session, :keep_alive => @keep_alive, :username => @username, :password => @password ) send_packet(packet) @state = :connect_sent end
Disconnect from the MQTT broker. If you don’t want to say goodbye to the broker, set send_msg to false.
# File lib/em/mqtt/client_connection.rb, line 89 def disconnect(send_msg=true) # FIXME: only close if we aren't waiting for any acknowledgements if connected? send_packet(MQTT::Packet::Disconnect.new) if send_msg end @state = :disconnecting end
EventMachine::MQTT::Connection#post_init
# File lib/em/mqtt/client_connection.rb, line 67 def post_init super @state = :connecting end
Publish a message on a particular topic to the MQTT broker.
# File lib/em/mqtt/client_connection.rb, line 116 def publish(topic, payload, retain=false, qos=0) # Defer publishing until we are connected callback do send_packet( MQTT::Packet::Publish.new( :id => next_packet_id, :qos => qos, :retain => retain, :topic => topic, :payload => payload ) ) end end
# File lib/em/mqtt/client_connection.rb, line 97 def receive_callback(&block) @receive_callback = block end
# File lib/em/mqtt/client_connection.rb, line 101 def receive_msg(packet) # Alternatively, subclass this method @receive_callback.call(packet) unless @receive_callback.nil? end
Send a subscribe message for one or more topics on the MQTT broker.
# File lib/em/mqtt/client_connection.rb, line 132 def subscribe(*topics) # Defer subscribing until we are connected callback do send_packet( MQTT::Packet::Subscribe.new( :id => next_packet_id, :topics => topics ) ) end end
# File lib/em/mqtt/client_connection.rb, line 106 def unbind timer.cancel if timer unless state == :disconnecting # Re-throw any exceptions (if present) to avoid swallowed errors. raise $! || MQTT::NotConnectedException.new("Connection to server lost") end @state = :disconnected end
Send a unsubscribe message for one or more topics on the MQTT broker
# File lib/em/mqtt/client_connection.rb, line 145 def unsubscribe(*topics) # Defer unsubscribing until we are connected callback do send_packet( MQTT::Packet::Unsubscribe.new( :id => next_packet_id, :topics => topics ) ) end end
Private Instance Methods
# File lib/em/mqtt/client_connection.rb, line 179 def connect_ack(packet) if packet.return_code != 0x00 raise MQTT::ProtocolException.new(packet.return_msg) else @state = :connected end # Send a ping packet every X seconds if keep_alive > 0 @timer = EventMachine::PeriodicTimer.new(keep_alive) do send_packet MQTT::Packet::Pingreq.new end end # We are now connected - can now execute deferred calls set_deferred_success end
# File lib/em/mqtt/client_connection.rb, line 197 def next_packet_id @packet_id += 1 end
# File lib/em/mqtt/client_connection.rb, line 161 def process_packet(packet) if state == :connect_sent and packet.class == MQTT::Packet::Connack connect_ack(packet) elsif state == :connected and packet.class == MQTT::Packet::Pingresp # Pong! elsif state == :connected and packet.class == MQTT::Packet::Publish receive_msg(packet) elsif state == :connected and packet.class == MQTT::Packet::Suback # Subscribed! else # FIXME: deal with other packet types raise MQTT::ProtocolException.new( "Wasn't expecting packet of type #{packet.class} when in state #{state}" ) disconnect end end