class Fluent::Plugin::MqttInput

Constants

DEFAULT_PARSER_TYPE

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mqtt.rb, line 32
def configure(conf)
  compat_parameters_convert(conf, :inject, :parser)
  super
  configure_parser(conf)
end
configure_parser(conf) click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 38
def configure_parser(conf)
  @parser = parser_create(usage: 'in_mqtt_parser', type: @format, conf: conf)
end
emit(topic, message, time = Fluent::Engine.now) click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 80
def emit topic, message, time = Fluent::Engine.now
  if message.class == Array
    message.each do |data|
      log.debug "#{topic}: #{data}"
      router.emit(topic , time , data)
    end
  else
    router.emit(topic , time , message)
  end
end
parse(message) click to toggle source

Return [time (if not available return now), message]

# File lib/fluent/plugin/in_mqtt.rb, line 43
def parse(message)
  @parser.parse(message) {|time, record|
    return (time || Fluent::Engine.now), record
  }
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mqtt.rb, line 91
def shutdown
  @connect.disconnect
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mqtt.rb, line 49
def start
  super
  log.debug "start mqtt #{@bind}"
  opts = {host: @bind,
          port: @port}
  opts[:client_id] = @client_id if @client_id
  opts[:username] =  @username if @username
  opts[:password] = @password if @password
  opts[:ssl] = @ssl if @ssl
  opts[:ca_file] = @ca if @ca
  opts[:cert_file] = @cert if @cert
  opts[:key_file] = @key if @key
  @connect = MQTT::Client.connect(opts)
  @connect.subscribe(@topic)

  thread_create(:in_mqtt_worker) do
    @connect.get do |topic,message|
      topic.gsub!("/","\.")
      log.debug "#{topic}: #{message}"
      begin
        time, record = self.parse(message)
        record = inject_values_to_record(topic, time, record)
      rescue Exception => e
        log.error e
      end
      emit topic, record, time
    end
  end
end