class MQTTPipe::Pipe

The actual wrapper class for MQTT

Public Class Methods

new(&block) click to toggle source

Create a new pipe and optionally yield to a block

# File lib/mqtt_pipe/pipe.rb, line 16
def initialize &block
  @listeners = []
  @error_handler = nil
  
  instance_eval &block unless block.nil?
end

Public Instance Methods

on(topic, &action) click to toggle source

Subscribe to a topic and attatch an action that will be called once a message with a matching topic is received.

# File lib/mqtt_pipe/pipe.rb, line 28
def on topic, &action
  raise ArgumentError, 'No block given' if action.nil?
  @listeners << Listener.new(topic, &action)
end
on_anything(&action) click to toggle source

Subscribe to all topics

# File lib/mqtt_pipe/pipe.rb, line 36
def on_anything &action
  on '#', &action
end
Also aliased as: on_everything
on_error(&action) click to toggle source
# File lib/mqtt_pipe/pipe.rb, line 40
def on_error &action
  @error_handler = action
end
on_everything(&action)
Alias for: on_anything
open(host, port: 1883, &block) click to toggle source

Open the pipe

# File lib/mqtt_pipe/pipe.rb, line 53
def open host, port: 1883, &block
  listener_thread = nil
  client  = MQTT::Client.connect host: host, port: port
  context = Context.new client
  
  unless @listeners.empty?
    listener_thread = Thread.new(Thread.current) do |parent|          
      client.get do |topic, data|
        begin
          unpacked_data = Packer.unpack data
          
          @listeners.each do |listener|
            if m = listener.match(topic)
              #listener.call unpacked_data, *m
              context.instance_exec unpacked_data, *m, &listener.action
            end
          end
          
        rescue Packer::FormatError
          @error_handler.call topic, data unless @error_handler.nil?
          next
          
        # Raise the exception in the parent thread
        rescue Exception => e
          parent.raise e
        end
      end
    end
    
    client.subscribe *topics
  end
  
  # Call user block
  if block_given?
    begin
      context.instance_eval &block
    rescue ConnectionError
      puts 'TODO: Handle reconnect'
    rescue Interrupt
      exit
    end
  end
  
  # Join with listener thread
  begin
    listener_thread.join unless listener_thread.nil?
  rescue Interrupt
  end
  
ensure
  client.disconnect if client
  listener_thread.exit if listener_thread
end
topics() click to toggle source
# File lib/mqtt_pipe/pipe.rb, line 46
def topics
  @listeners.map{|listener| listener.topic }
end