class LogStash::Inputs::Mqtt

Generate a repeating message.

This plugin is intented only as an example.

Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/mqtt.rb, line 42
def register
        @logstash_host = Socket.gethostname
end
run(queue) click to toggle source
# File lib/logstash/inputs/mqtt.rb, line 46
def run(queue)
        PahoMqtt.logger = @logfile unless @logfile.nil?
        #levels: DEBUG < INFO < WARN < ERROR < FATAL < UNKNOWN
        PahoMqtt.logger.level = @log_level unless @logfile.nil?

        @client = PahoMqtt::Client.new({
                :host => @host,
                :port => @port,
                :persistent => @persistent, # keep connection persistent
                :mqtt_version => @mqtt_version,
                :clean_session => @clean_session,
                :client_id => @client_id,
                :username => @username,
                :password => @password,
                :ssl => @ssl,
                :will_topic => @will_topic,
                :will_payload => @will_payload,
                :will_qos => @will_qos,
                :will_retain => @will_retain,
                :reconnect_limit => @reconnect_retries,
                :reconnect_delay => @reconnect_sleep_time,
        })

        if @ssl
                @client.config_ssl_context(@certificate_path, @key_path, @root_ca_path)
        end

        @client.on_message do |message|
                @codec.decode(message.payload) do |event|
                        host = event.get("host") || @logstash_host
                        event.set("host", host)
                        event.set("topic", message.topic)

                        decorate(event)
                        queue << event
                end
        end
        
        is_connected = false

        begin
                @client.connect
                is_connected = true
        rescue PahoMqtt::Exception => e
                @logger.warn("Error while setting up connection for MQTT broker! Retrying.",
                        :message => e.message,
                        :class => e.class.name,
                        :location => e.backtrace.first
                )
                Stud.stoppable_sleep(1) { stop? }
                retry
        end

        if is_connected
                # subscribe to topic
                @client.subscribe([@topic, @qos])

                Stud.stoppable_sleep(1) { stop? } while !stop?
        end
end