class MQTTPipe::Pipe
The actual wrapper class for MQTT
Public Class Methods
new(&block)
click to toggle source
Create a new pipe and optionally yield to a block
# File lib/mqtt_pipe/pipe.rb, line 16 def initialize &block @listeners = [] @error_handler = nil instance_eval &block unless block.nil? end
Public Instance Methods
on(topic, &action)
click to toggle source
Subscribe to a topic and attatch an action that will be called once a message with a matching topic is received.
# File lib/mqtt_pipe/pipe.rb, line 28 def on topic, &action raise ArgumentError, 'No block given' if action.nil? @listeners << Listener.new(topic, &action) end
on_anything(&action)
click to toggle source
Subscribe to all topics
# File lib/mqtt_pipe/pipe.rb, line 36 def on_anything &action on '#', &action end
Also aliased as: on_everything
on_error(&action)
click to toggle source
# File lib/mqtt_pipe/pipe.rb, line 40 def on_error &action @error_handler = action end
open(host, port: 1883, &block)
click to toggle source
Open the pipe
# File lib/mqtt_pipe/pipe.rb, line 53 def open host, port: 1883, &block listener_thread = nil client = MQTT::Client.connect host: host, port: port context = Context.new client unless @listeners.empty? listener_thread = Thread.new(Thread.current) do |parent| client.get do |topic, data| begin unpacked_data = Packer.unpack data @listeners.each do |listener| if m = listener.match(topic) #listener.call unpacked_data, *m context.instance_exec unpacked_data, *m, &listener.action end end rescue Packer::FormatError @error_handler.call topic, data unless @error_handler.nil? next # Raise the exception in the parent thread rescue Exception => e parent.raise e end end end client.subscribe *topics end # Call user block if block_given? begin context.instance_eval &block rescue ConnectionError puts 'TODO: Handle reconnect' rescue Interrupt exit end end # Join with listener thread begin listener_thread.join unless listener_thread.nil? rescue Interrupt end ensure client.disconnect if client listener_thread.exit if listener_thread end
topics()
click to toggle source
# File lib/mqtt_pipe/pipe.rb, line 46 def topics @listeners.map{|listener| listener.topic } end