class Fluent::Plugin::MqttOutput

Public Instance Methods

add_send_time(record) click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 125
def add_send_time(record)
  if !@monitor.nil? && @monitor.send_time
    # send_time is recorded in ms
    record.merge({"#{@monitor.send_time_key}": @send_time_formatter.format(Fluent::EventTime.now)})
  else
    record
  end
end
after_connection() click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 114
def after_connection
  @dummy_thread = thread_create(:out_mqtt_dummy) do
    Thread.stop
  end
  @dummy_thread
end
configure(conf) click to toggle source

This method is called before starting. 'conf' is a Hash that includes configuration parameters. If the configuration is invalid, raise Fluent::ConfigError.

Calls superclass method
# File lib/fluent/plugin/out_mqtt.rb, line 52
def configure(conf)
  super
  compat_parameters_convert(conf, :formatter, :inject, :buffer, default_chunk_key: "time")
  formatter_config = conf.elements(name: 'format').first
  @formatter = formatter_create(conf: formatter_config)
  @has_buffer_section = conf.elements(name: 'buffer').size > 0
  if !@monitor.nil?
    @send_time_formatter = time_formatter_create(
      type: @monitor.time_type.to_sym, format: @monitor.time_format
    )
  end
end
current_plugin_name() click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 121
def current_plugin_name
  :out_mqtt
end
disconnect() click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 100
def disconnect
  begin
    @client.disconnect if @client.connected?
  rescue => e
    log.error "Error in out_mqtt#disconnect,#{e.class},#{e.message}"
  end
  exit_thread
end
exit_thread() click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 96
def exit_thread
  @dummy_thread.exit if !@dummy_thread.nil?
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 149
def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)
  [tag, time, record].to_msgpack
end
formatted_to_msgpack_binary() click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 154
def formatted_to_msgpack_binary
  true
end
prefer_buffered_processing() click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 73
def prefer_buffered_processing
  @has_buffer_section
end
prefer_delayed_commit() click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 77
def prefer_delayed_commit
  @has_buffer_section && @buffer_config.async
end
process(tag, es) click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 145
def process(tag, es)
  publish_event_stream(tag, es)
end
publish(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 158
def publish(tag, time, record)
  log.debug "MqttOutput::#{caller_locations(1,1)[0].label}: #{rewrite_tag(tag)}, #{time}, #{add_send_time(record)}"
  @client.publish(
    rewrite_tag(tag),
    @formatter.format(tag, time, add_send_time(record)),
    @retain,
    @qos
  )
end
publish_event_stream(tag, es) click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 134
def publish_event_stream(tag, es)
  log.debug "publish_event_stream: #{es.class}"
  es = inject_values_to_event_stream(tag, es)
  es.each do |time, record|
    rescue_disconnection do
      publish(tag, time, record)
    end
  end
  log.flush
end
rewrite_tag(tag) click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 65
def rewrite_tag(tag)
  if @topic_rewrite_pattern.nil?
    tag.gsub("\.", "/")
  else
    tag.gsub("\.", "/").gsub(Regexp.new(@topic_rewrite_pattern), @topic_rewrite_replacement)
  end
end
shutdown() click to toggle source

This method is called when shutting down. Shutdown the thread and close sockets or files here.

Calls superclass method
# File lib/fluent/plugin/out_mqtt.rb, line 90
def shutdown
  shutdown_proxy
  exit_thread
  super
end
start() click to toggle source

This method is called when starting. Open sockets or files here.

Calls superclass method
# File lib/fluent/plugin/out_mqtt.rb, line 83
def start
  super
  start_proxy
end
terminate() click to toggle source
Calls superclass method Fluent::Plugin::MqttProxy#terminate
# File lib/fluent/plugin/out_mqtt.rb, line 109
def terminate
  exit_thread
  super
end
try_write(chunk) click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 177
def try_write(chunk)
  return if chunk.empty?
  rescue_disconnection do
    chunk.each do |tag, time, record|
      publish(tag, time, record)
    end
    commit_write(chunk.unique_id)
  end
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_mqtt.rb, line 168
def write(chunk)
  return if chunk.empty?
  chunk.each do |tag, time, record|
    rescue_disconnection do
      publish(tag, time, record)
    end
  end
end