class Fluent::Plugin::MqttInput
Constants
- DEFAULT_PARSER_TYPE
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mqtt.rb, line 32 def configure(conf) compat_parameters_convert(conf, :inject, :parser) super configure_parser(conf) end
configure_parser(conf)
click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 38 def configure_parser(conf) @parser = parser_create(usage: 'in_mqtt_parser', type: @format, conf: conf) end
emit(topic, message, time = Fluent::Engine.now)
click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 80 def emit topic, message, time = Fluent::Engine.now if message.class == Array message.each do |data| log.debug "#{topic}: #{data}" router.emit(topic , time , data) end else router.emit(topic , time , message) end end
parse(message)
click to toggle source
Return [time (if not available return now), message]
# File lib/fluent/plugin/in_mqtt.rb, line 43 def parse(message) @parser.parse(message) {|time, record| return (time || Fluent::Engine.now), record } end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mqtt.rb, line 91 def shutdown @connect.disconnect super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mqtt.rb, line 49 def start super log.debug "start mqtt #{@bind}" opts = {host: @bind, port: @port} opts[:client_id] = @client_id if @client_id opts[:username] = @username if @username opts[:password] = @password if @password opts[:ssl] = @ssl if @ssl opts[:ca_file] = @ca if @ca opts[:cert_file] = @cert if @cert opts[:key_file] = @key if @key @connect = MQTT::Client.connect(opts) @connect.subscribe(@topic) thread_create(:in_mqtt_worker) do @connect.get do |topic,message| topic.gsub!("/","\.") log.debug "#{topic}: #{message}" begin time, record = self.parse(message) record = inject_values_to_record(topic, time, record) rescue Exception => e log.error e end emit topic, record, time end end end