class EventMachine::MQTT::ClientConnection

Attributes

ack_timeout[R]
clean_session[R]
client_id[R]
keep_alive[R]
packet_id[R]
password[R]
timer[R]
username[R]

Public Class Methods

connect(*args, &blk) click to toggle source

Connect to an MQTT server

Examples:

ClientConnection.connect('localhost', 1883)
ClientConnection.connect(:host => 'localhost', :username => 'user', :password => 'pass')
# File lib/em/mqtt/client_connection.rb, line 20
def self.connect(*args, &blk)
  hash = {
    :host => 'localhost',
    :port => MQTT::DEFAULT_PORT
  }

  i = 0
  args.each do |arg|
    if arg.is_a?(Hash)
      hash.merge!(arg)
    else
      if i == 0
        hash[:host] = arg
      elsif i == 1
        hash[:port] = arg
      end
      i += 1
    end
  end

  ::EventMachine.connect( hash.delete(:host), hash.delete(:port), self, hash, &blk )
end
new(args={}) click to toggle source

Initialize connection @param args [Hash] Arguments for connection @option args [String] :client_id A unique identifier for this client @option args [Integer] :keep_alive How often to send keep-alive pings (in seconds) @option args [Boolean] :clean_session Start a clean session with server or resume old one (default true) @option args [String] :username Username to authenticate with the server @option args [String] :password Password to authenticate with the server

# File lib/em/mqtt/client_connection.rb, line 50
def initialize(args={})
  @client_id = MQTT::Client.generate_client_id
  @keep_alive = 10
  @clean_session = true
  @packet_id = 0
  @ack_timeout = 5
  @username = nil
  @password = nil
  @timer = nil

  if args.is_a?(Hash)
    args.each_pair do |k,v|
      instance_variable_set("@#{k}", v)
    end
  end
end

Public Instance Methods

connection_completed() click to toggle source
# File lib/em/mqtt/client_connection.rb, line 72
def connection_completed
  # TCP socket established: send Connect packet
  packet = MQTT::Packet::Connect.new(
    :client_id => @client_id,
    :clean_session => @clean_session,
    :keep_alive => @keep_alive,
    :username => @username,
    :password => @password
  )

  send_packet(packet)

  @state = :connect_sent
end
disconnect(send_msg=true) click to toggle source

Disconnect from the MQTT broker. If you don’t want to say goodbye to the broker, set send_msg to false.

# File lib/em/mqtt/client_connection.rb, line 89
def disconnect(send_msg=true)
  # FIXME: only close if we aren't waiting for any acknowledgements
  if connected?
    send_packet(MQTT::Packet::Disconnect.new) if send_msg
  end
  @state = :disconnecting
end
post_init() click to toggle source
# File lib/em/mqtt/client_connection.rb, line 67
def post_init
  super
  @state = :connecting
end
publish(topic, payload, retain=false, qos=0) click to toggle source

Publish a message on a particular topic to the MQTT broker.

# File lib/em/mqtt/client_connection.rb, line 116
def publish(topic, payload, retain=false, qos=0)
  # Defer publishing until we are connected
  callback do
    send_packet(
      MQTT::Packet::Publish.new(
        :id => next_packet_id,
        :qos => qos,
        :retain => retain,
        :topic => topic,
        :payload => payload
      )
    )
  end
end
receive_callback(&block) click to toggle source
# File lib/em/mqtt/client_connection.rb, line 97
def receive_callback(&block)
  @receive_callback = block
end
receive_msg(packet) click to toggle source
# File lib/em/mqtt/client_connection.rb, line 101
def receive_msg(packet)
  # Alternatively, subclass this method
  @receive_callback.call(packet) unless @receive_callback.nil?
end
subscribe(*topics) click to toggle source

Send a subscribe message for one or more topics on the MQTT broker.

# File lib/em/mqtt/client_connection.rb, line 132
def subscribe(*topics)
  # Defer subscribing until we are connected
  callback do
    send_packet(
      MQTT::Packet::Subscribe.new(
        :id => next_packet_id,
        :topics => topics
      )
    )
  end
end
unbind() click to toggle source
# File lib/em/mqtt/client_connection.rb, line 106
def unbind
  timer.cancel if timer
  unless state == :disconnecting
    # Re-throw any exceptions (if present) to avoid swallowed errors.
    raise $! || MQTT::NotConnectedException.new("Connection to server lost")
  end
  @state = :disconnected
end
unsubscribe(*topics) click to toggle source

Send a unsubscribe message for one or more topics on the MQTT broker

# File lib/em/mqtt/client_connection.rb, line 145
def unsubscribe(*topics)
  # Defer unsubscribing until we are connected
  callback do
    send_packet(
      MQTT::Packet::Unsubscribe.new(
        :id => next_packet_id,
        :topics => topics
      )
    )
  end
end

Private Instance Methods

connect_ack(packet) click to toggle source
# File lib/em/mqtt/client_connection.rb, line 179
def connect_ack(packet)
  if packet.return_code != 0x00
    raise MQTT::ProtocolException.new(packet.return_msg)
  else
    @state = :connected
  end

  # Send a ping packet every X seconds
  if keep_alive > 0
    @timer = EventMachine::PeriodicTimer.new(keep_alive) do
      send_packet MQTT::Packet::Pingreq.new
    end
  end

  # We are now connected - can now execute deferred calls
  set_deferred_success
end
next_packet_id() click to toggle source
# File lib/em/mqtt/client_connection.rb, line 197
def next_packet_id
  @packet_id += 1
end
process_packet(packet) click to toggle source
# File lib/em/mqtt/client_connection.rb, line 161
def process_packet(packet)
  if state == :connect_sent and packet.class == MQTT::Packet::Connack
    connect_ack(packet)
  elsif state == :connected and packet.class == MQTT::Packet::Pingresp
    # Pong!
  elsif state == :connected and packet.class == MQTT::Packet::Publish
    receive_msg(packet)
  elsif state == :connected and packet.class == MQTT::Packet::Suback
    # Subscribed!
  else
    # FIXME: deal with other packet types
    raise MQTT::ProtocolException.new(
      "Wasn't expecting packet of type #{packet.class} when in state #{state}"
    )
    disconnect
  end
end