class Fluent::Plugin::MqttInput
Public Instance Methods
add_recv_time(record)
click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 94 def add_recv_time(record) if !@monitor.nil? && @monitor.recv_time # recv_time is recorded in ms record.merge({"#{@monitor.recv_time_key}": @recv_time_formatter.format(Fluent::EventTime.now)}) else record end end
after_connection()
click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 82 def after_connection if @client.connected? @client.subscribe(@topic) @get_thread = thread_create(:in_mqtt_get) do @client.get do |topic, message| emit(topic, message) end end end @get_thread end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mqtt.rb, line 34 def configure(conf) super configure_parser(conf) if !@monitor.nil? @recv_time_formatter = time_formatter_create( type: @monitor.time_type.to_sym, format: @monitor.time_format ) end end
configure_parser(conf)
click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 44 def configure_parser(conf) compat_parameters_convert(conf, :parser) parser_config = conf.elements('parse').first @parser = parser_create(conf: parser_config) end
current_plugin_name()
click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 60 def current_plugin_name :in_mqtt end
disconnect()
click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 68 def disconnect begin @client.disconnect if @client.connected? rescue => e log.error "Error in in_mqtt#disconnect,#{e.class},#{e.message}" end exit_thread end
emit(topic, message)
click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 113 def emit(topic, message) begin tag = topic.gsub("/","\.") time, record = parse(message) if record.is_a?(Array) mes = Fluent::MultiEventStream.new record.each do |single_record| log.debug "MqttInput#emit: #{tag}, #{time}, #{add_recv_time(single_record)}" mes.add(@parser.parse_time(single_record), add_recv_time(single_record)) end router.emit_stream(tag, mes) else log.debug "MqttInput#emit: #{tag}, #{time}, #{add_recv_time(record)}" router.emit(tag, time, add_recv_time(record)) end rescue Exception => e log.error error: e.to_s log.debug_backtrace(e.backtrace) end end
exit_thread()
click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 64 def exit_thread @get_thread.exit if !@get_thread.nil? end
parse(message)
click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 103 def parse(message) @parser.parse(message) do |time, record| if time.nil? log.debug "Since time_key field is nil, Fluent::EventTime.now is used." time = Fluent::EventTime.now end return [time, record] end end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mqtt.rb, line 55 def shutdown shutdown_proxy super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mqtt.rb, line 50 def start super start_proxy end
terminate()
click to toggle source
Calls superclass method
Fluent::Plugin::MqttProxy#terminate
# File lib/fluent/plugin/in_mqtt.rb, line 77 def terminate exit_thread super end