class MqttSrvc

framework for building ruby-based MQTT services

Constants

CONFIG_TYPE

Attributes

logger[R]

Public Class Methods

new(dir) click to toggle source
# File lib/mqttsrvc.rb, line 15
def initialize(dir)
  @config = Config.for(dir: dir, config_type: CONFIG_TYPE)
  @credentials = Credentials.for(dir: dir, credentials_type: 'mqtt')

  puts 'Setting up logger...'
  @logger = MqttSrvcLogger.new(dir)
  @logger.info("Logger configured.")

  @logger.info('Creating MQTT client...')
  @mqtt_client = MQTT::Client.connect(
    host: @config.host,
    port: @config.port,
    ssl: @config.ssl,
    keep_alive: @config.keep_alive,
    username: @credentials['api_key'],
    password: @credentials['api_secret']
  )
  @logger.info("Client connected to #{@config.host}.")

  subscribe_to_topics
  subscribe_to_ping
end

Public Instance Methods

ping_topic() click to toggle source
# File lib/mqttsrvc.rb, line 55
def ping_topic
  @ping_topic ||= @config.topics.ping + '/' + @credentials['api_key']
end
pong_topic() click to toggle source
# File lib/mqttsrvc.rb, line 59
def pong_topic
  @pong_topic ||= @config.topics.pong + '/' + @credentials['api_key']
end
publish(topic, payload, retain = false) click to toggle source
# File lib/mqttsrvc.rb, line 47
def publish(topic, payload, retain = false)
  payload = { @config.implicit_hash_key => payload } if payload.class.to_s == "".class.to_s
  add_client_id(payload)
  payload = payload.to_json
  @logger.info('PUB ' + topic + ' | ' + payload.to_s)
  @mqtt_client.publish(topic, payload, retain: retain)
end
run() click to toggle source
# File lib/mqttsrvc.rb, line 38
def run
  @run = true
  do_loop
end
stop() click to toggle source
# File lib/mqttsrvc.rb, line 43
def stop
  @run = false
end

Private Instance Methods

add_client_id(payload) click to toggle source
# File lib/mqttsrvc.rb, line 131
def add_client_id(payload)
  payload[:clientid] = @credentials['api_key']
end
decode_message(msg) click to toggle source
# File lib/mqttsrvc.rb, line 111
def decode_message(msg)
  begin
    msg_hash = JSON.parse(msg)
    msg_hash = nil unless msg_hash.class == Hash
  rescue JSON::ParserError
    msg_hash = {@config.implicit_hash_key => msg}
  end
  msg_hash
end
do_loop() click to toggle source
# File lib/mqttsrvc.rb, line 87
def do_loop
  while @run
    begin
      @mqtt_client.get(nil, @config.client) do |topic, msg|
        @logger.debug 'RCV ' + topic + ' | ' + msg.to_s
        msg_hash = decode_message(msg)
        mqtt_receive_shim(topic, msg, msg_hash)
      end
    rescue SystemExit, Interrupt
      raise
    rescue StandardError => e
      @logger.error_bold e.message
      @logger.error '    ' + e.backtrace.join($/ + '    ')
    end
  end
rescue Interrupt => e
  @logger.info '[INT] Exiting...'
  begin
    @mqtt_client.disconnect
  rescue Interrupt
    @logger.info '[INT] Aborting...'
  end
end
mqtt_receive_shim(topic, msg, msg_hash) click to toggle source
# File lib/mqttsrvc.rb, line 121
def mqtt_receive_shim(topic, msg, msg_hash)
  @logger.info "#{topic} #{msg}"
  if topic == ping_topic
    @logger.debug('Handling ping...')
    publish(pong_topic, state: true)
  else
    mqtt_receive(topic, msg, msg_hash)
  end
end
subscribe(topic) click to toggle source
# File lib/mqttsrvc.rb, line 82
def subscribe(topic)
  @logger.info('SUB ' + topic)
  @mqtt_client.subscribe(topic)
end
subscribe_to_ping() click to toggle source
# File lib/mqttsrvc.rb, line 77
def subscribe_to_ping
  @logger.debug("Subscribing to #{ping_topic}...")
  subscribe(ping_topic)
end
subscribe_to_topics() click to toggle source
# File lib/mqttsrvc.rb, line 65
def subscribe_to_topics
  topics = @config.topics.sub
  if topics.nil? || topics.empty?
    @logger.info('No topics to which to subscribe.')
    return
  end
  topics.each do |topic|
    @logger.debug("Subscribing to #{topic}...")
    subscribe(topic)
  end
end