class Fluent::Plugin::MqttOutput
Public Instance Methods
add_send_time(record)
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 125 def add_send_time(record) if !@monitor.nil? && @monitor.send_time # send_time is recorded in ms record.merge({"#{@monitor.send_time_key}": @send_time_formatter.format(Fluent::EventTime.now)}) else record end end
after_connection()
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 114 def after_connection @dummy_thread = thread_create(:out_mqtt_dummy) do Thread.stop end @dummy_thread end
configure(conf)
click to toggle source
This method is called before starting. 'conf' is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.
Calls superclass method
# File lib/fluent/plugin/out_mqtt.rb, line 52 def configure(conf) super compat_parameters_convert(conf, :formatter, :inject, :buffer, default_chunk_key: "time") formatter_config = conf.elements(name: 'format').first @formatter = formatter_create(conf: formatter_config) @has_buffer_section = conf.elements(name: 'buffer').size > 0 if !@monitor.nil? @send_time_formatter = time_formatter_create( type: @monitor.time_type.to_sym, format: @monitor.time_format ) end end
current_plugin_name()
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 121 def current_plugin_name :out_mqtt end
disconnect()
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 100 def disconnect begin @client.disconnect if @client.connected? rescue => e log.error "Error in out_mqtt#disconnect,#{e.class},#{e.message}" end exit_thread end
exit_thread()
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 96 def exit_thread @dummy_thread.exit if !@dummy_thread.nil? end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 149 def format(tag, time, record) record = inject_values_to_record(tag, time, record) [tag, time, record].to_msgpack end
formatted_to_msgpack_binary()
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 154 def formatted_to_msgpack_binary true end
prefer_buffered_processing()
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 73 def prefer_buffered_processing @has_buffer_section end
prefer_delayed_commit()
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 77 def prefer_delayed_commit @has_buffer_section && @buffer_config.async end
process(tag, es)
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 145 def process(tag, es) publish_event_stream(tag, es) end
publish(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 158 def publish(tag, time, record) log.debug "MqttOutput::#{caller_locations(1,1)[0].label}: #{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}" @client.publish( rewrite_tag(tag), @formatter.format(tag, time, add_send_time(record)), @retain, @qos ) end
publish_event_stream(tag, es)
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 134 def publish_event_stream(tag, es) log.debug "publish_event_stream: #{es.class}" es = inject_values_to_event_stream(tag, es) es.each do |time, record| rescue_disconnection do publish(tag, time, record) end end log.flush end
rewrite_tag(tag)
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 65 def rewrite_tag(tag) if @topic_rewrite_pattern.nil? tag.gsub("\.", "/") else tag.gsub("\.", "/").gsub(Regexp.new(@topic_rewrite_pattern), @topic_rewrite_replacement) end end
shutdown()
click to toggle source
This method is called when shutting down. Shutdown the thread and close sockets or files here.
Calls superclass method
# File lib/fluent/plugin/out_mqtt.rb, line 90 def shutdown shutdown_proxy exit_thread super end
start()
click to toggle source
This method is called when starting. Open sockets or files here.
Calls superclass method
# File lib/fluent/plugin/out_mqtt.rb, line 83 def start super start_proxy end
terminate()
click to toggle source
Calls superclass method
Fluent::Plugin::MqttProxy#terminate
# File lib/fluent/plugin/out_mqtt.rb, line 109 def terminate exit_thread super end
try_write(chunk)
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 177 def try_write(chunk) return if chunk.empty? rescue_disconnection do chunk.each do |tag, time, record| publish(tag, time, record) end commit_write(chunk.unique_id) end end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 168 def write(chunk) return if chunk.empty? chunk.each do |tag, time, record| rescue_disconnection do publish(tag, time, record) end end end