class LogStash::Inputs::DeadLetterQueue

Logstash input to read events from Logstash's dead letter queue

source, sh

input {

dead_letter_queue {
  path => "/var/logstash/data/dead_letter_queue"
  timestamp => "2017-04-04T23:40:37"
}

}


Public Instance Methods

register() click to toggle source
# File lib/logstash/inputs/dead_letter_queue.rb, line 41
def register
  if @sincedb_path.nil?
    datapath = File.join(LogStash::SETTINGS.get_value("path.data"), "plugins", "inputs", "dead_letter_queue", @pipeline_id)
    # Ensure that the filepath exists before writing, since it's deeply nested.
    FileUtils::mkdir_p datapath
    @sincedb_path = File.join(datapath, ".sincedb_" + Digest::MD5.hexdigest(@path))
  elsif File.directory?(@sincedb_path)
      raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"")
  end

  dlq_path = java.nio.file.Paths.get(File.join(@path, @pipeline_id))
  sincedb_path = @sincedb_path ? java.nio.file.Paths.get(@sincedb_path) : nil
  start_timestamp = @start_timestamp ? org.logstash.Timestamp.new(@start_timestamp) : nil
  @inner_plugin = org.logstash.input.DeadLetterQueueInputPlugin.new(dlq_path, @commit_offsets, sincedb_path, start_timestamp)
  @inner_plugin.register
end
run(logstash_queue) click to toggle source
# File lib/logstash/inputs/dead_letter_queue.rb, line 59
def run(logstash_queue)
  @inner_plugin.run do |entry|
    event = LogStash::Event.new(entry.event.toMap())
    event.set("[@metadata][dead_letter_queue][plugin_type]", entry.plugin_type)
    event.set("[@metadata][dead_letter_queue][plugin_id]", entry.plugin_id)
    event.set("[@metadata][dead_letter_queue][reason]", entry.reason)
    event.set("[@metadata][dead_letter_queue][entry_time]", entry.entry_time)
    decorate(event)
    logstash_queue << event
  end
end
stop() click to toggle source
# File lib/logstash/inputs/dead_letter_queue.rb, line 72
def stop
  @inner_plugin.close
end