class EventMachine::MQTT::ServerConnection
Attributes
client_id[RW]
keep_alive[RW]
last_packet[RW]
logger[R]
packet_id[RW]
subscriptions[RW]
timer[R]
Public Class Methods
new(logger)
click to toggle source
# File lib/em/mqtt/server_connection.rb, line 15 def initialize(logger) @logger = logger end
Public Instance Methods
connect(packet)
click to toggle source
# File lib/em/mqtt/server_connection.rb, line 59 def connect(packet) # FIXME: check the protocol name and version # FIXME: check the client id is between 1 and 23 charcters self.client_id = packet.client_id ## FIXME: disconnect old client with the same ID send_packet MQTT::Packet::Connack.new @state = :connected @@clients << self logger.info("#{client_id} is now connected") # Setup a keep-alive timer if packet.keep_alive @keep_alive = packet.keep_alive logger.debug("#{client_id}: Setting keep alive timer to #{@keep_alive} seconds") @timer = EventMachine::PeriodicTimer.new(@keep_alive / 2) do last_seen = Time.now - @last_received if last_seen > @keep_alive * 1.5 logger.info("Disconnecting '#{client_id}' because it hasn't been seen for #{last_seen} seconds") disconnect end end end end
disconnect()
click to toggle source
# File lib/em/mqtt/server_connection.rb, line 84 def disconnect logger.debug("Closing connection to #{client_id}") @state = :disconnected close_connection end
ping(packet)
click to toggle source
# File lib/em/mqtt/server_connection.rb, line 90 def ping(packet) send_packet MQTT::Packet::Pingresp.new end
post_init()
click to toggle source
Calls superclass method
EventMachine::MQTT::Connection#post_init
# File lib/em/mqtt/server_connection.rb, line 19 def post_init super @state = :wait_connect @client_id = nil @keep_alive = 0 @packet_id = 0 @subscriptions = [] @timer = nil logger.debug("TCP connection opened") end
process_packet(packet)
click to toggle source
# File lib/em/mqtt/server_connection.rb, line 36 def process_packet(packet) logger.debug("#{client_id}: #{packet.inspect}") if state == :wait_connect and packet.class == MQTT::Packet::Connect connect(packet) elsif state == :connected and packet.class == MQTT::Packet::Pingreq ping(packet) elsif state == :connected and packet.class == MQTT::Packet::Subscribe subscribe(packet) elsif state == :connected and packet.class == MQTT::Packet::Publish publish(packet) elsif packet.class == MQTT::Packet::Disconnect logger.info("#{client_id} has disconnected") disconnect else # FIXME: deal with other packet types raise MQTT::ProtocolException.new( "Wasn't expecting packet of type #{packet.class} when in state #{state}" ) disconnect end end
publish(packet)
click to toggle source
# File lib/em/mqtt/server_connection.rb, line 103 def publish(packet) @@clients.each do |client| if client.subscriptions.include?(packet.topic) or client.subscriptions.include?('#') client.send_packet(packet) end end end
subscribe(packet)
click to toggle source
# File lib/em/mqtt/server_connection.rb, line 94 def subscribe(packet) packet.topics.each do |topic,qos| self.subscriptions << topic end logger.info("#{client_id} has subscriptions: #{self.subscriptions}") # FIXME: send subscribe acknowledgement end
unbind()
click to toggle source
# File lib/em/mqtt/server_connection.rb, line 30 def unbind @@clients.delete(self) @timer.cancel if @timer logger.debug("TCP connection closed") end