class Fluent::Plugin::MqttInput

Public Instance Methods

add_recv_time(record) click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 94
def add_recv_time(record)
  if !@monitor.nil? && @monitor.recv_time
    # recv_time is recorded in ms
    record.merge({"#{@monitor.recv_time_key}": @recv_time_formatter.format(Fluent::EventTime.now)})
  else
    record
  end
end
after_connection() click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 82
def after_connection
  if @client.connected?
    @client.subscribe(@topic)
    @get_thread = thread_create(:in_mqtt_get) do
      @client.get do |topic, message|
        emit(topic, message)
      end
    end
  end
  @get_thread
end
configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mqtt.rb, line 34
def configure(conf)
  super
  configure_parser(conf)
  if !@monitor.nil?
    @recv_time_formatter = time_formatter_create(
      type: @monitor.time_type.to_sym, format: @monitor.time_format
    )
  end
end
configure_parser(conf) click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 44
def configure_parser(conf)
  compat_parameters_convert(conf, :parser)
  parser_config = conf.elements('parse').first
  @parser = parser_create(conf: parser_config)
end
current_plugin_name() click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 60
def current_plugin_name
  :in_mqtt
end
disconnect() click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 68
def disconnect
  begin
    @client.disconnect if @client.connected?
  rescue => e
    log.error "Error in in_mqtt#disconnect,#{e.class},#{e.message}"
  end
  exit_thread
end
emit(topic, message) click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 113
def emit(topic, message)
  begin
    tag = topic.gsub("/","\.")
    time, record = parse(message)
    if record.is_a?(Array)
      mes = Fluent::MultiEventStream.new
      record.each do |single_record|
        log.debug "MqttInput#emit: #{tag}, #{time}, #{add_recv_time(single_record)}"
        mes.add(@parser.parse_time(single_record), add_recv_time(single_record))
      end
      router.emit_stream(tag, mes)
    else
      log.debug "MqttInput#emit: #{tag}, #{time}, #{add_recv_time(record)}"
      router.emit(tag, time, add_recv_time(record))
    end
  rescue Exception => e
    log.error error: e.to_s
    log.debug_backtrace(e.backtrace)
  end
end
exit_thread() click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 64
def exit_thread
  @get_thread.exit if !@get_thread.nil?
end
parse(message) click to toggle source
# File lib/fluent/plugin/in_mqtt.rb, line 103
def parse(message)
  @parser.parse(message) do |time, record|
    if time.nil?
      log.debug "Since time_key field is nil, Fluent::EventTime.now is used."
      time = Fluent::EventTime.now
    end
    return [time, record]
  end
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mqtt.rb, line 55
def shutdown
  shutdown_proxy
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_mqtt.rb, line 50
def start
  super
  start_proxy
end
terminate() click to toggle source
Calls superclass method Fluent::Plugin::MqttProxy#terminate
# File lib/fluent/plugin/in_mqtt.rb, line 77
def terminate
  exit_thread
  super
end