class Fluent::Plugin::JournalInput

Fluentd plugin for reading from the systemd journal files

Constants

DEFAULT_STORAGE_TYPE

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_journal.rb, line 38
def configure(conf)
  super
  @journal = nil
  @pos_storage = storage_create(usage: 'positions')
  @mutator = SystemdEntryMutator.new(**@entry_opts.to_h)
  @mutator.warnings.each { |warning| log.warn(warning) }
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_journal.rb, line 46
def start
  super
  timer_execute(:in_systemd_emit_worker, 1, &method(:run))
end

Private Instance Methods

emit(entry) click to toggle source
# File lib/fluent/plugin/in_journal.rb, line 107
def emit(entry)
  router.emit(@tag, Fluent::EventTime.from_time(entry.realtime_timestamp), formatted(entry))
rescue Fluent::Plugin::Buffer::BufferOverflowError => e
  retries ||= 0
  raise e if retries > 10
  retries += 1
  sleep 1.5**retries + rand(0..3)
  retry
rescue => e # rubocop:disable Style/RescueStandardError
  log.error("Exception emitting record: #{e}")
end
formatted(entry) click to toggle source
# File lib/fluent/plugin/in_journal.rb, line 119
def formatted(entry)
  @mutator.run(entry)
end
init_journal() click to toggle source
# File lib/fluent/plugin/in_journal.rb, line 53
def init_journal
  # TODO: ruby 2.3
  @journal_files = []
  files = Dir.glob(@path)
  files.each do |f|
    @journal_files = @journal_files << f if f =~ @pattern
  end
  @journal.close if @journal # rubocop:disable Style/SafeNavigation
  @journal = Systemd::Journal.new(files: @journal_files)
  @journal.filter(*(@matches || @filters))
  seek
  true
rescue Systemd::JournalError => e
  log.warn("#{e.class}: #{e.message} retrying in 1s")
  false
end
read_from() click to toggle source
# File lib/fluent/plugin/in_journal.rb, line 95
def read_from
  @read_from_head ? :head : :tail
end
run() click to toggle source
# File lib/fluent/plugin/in_journal.rb, line 99
def run
  return unless @journal || init_journal
  init_journal if @journal.wait(0) == :invalidate
  watch do |entry|
    emit(entry)
  end
end
seek() click to toggle source
# File lib/fluent/plugin/in_journal.rb, line 70
def seek
  cursor = @pos_storage.get(:journal)
  seek_to(cursor || read_from)
rescue Systemd::JournalError
  log.warn(
    "Could not seek to cursor #{cursor} found in position file: #{@pos_storage.path}, " \
    "falling back to reading from #{read_from}"
  )
  seek_to(read_from)
end
seek_to(pos) click to toggle source

according to github.com/ledbettj/systemd-journal/issues/64#issuecomment-271056644 and bugs.freedesktop.org/show_bug.cgi?id=64614, after doing a seek(:tail), you must move back in such a way that the next move_next will return the last record

# File lib/fluent/plugin/in_journal.rb, line 85
def seek_to(pos)
  @journal.seek(pos)
  return if pos == :head
  if pos == :tail
    @journal.move(-2)
  else
    @journal.move(1)
  end
end
watch(&block) click to toggle source
# File lib/fluent/plugin/in_journal.rb, line 123
def watch(&block)
  yield_current_entry(&block) while @journal.move_next
end
yield_current_entry() { |current_entry| ... } click to toggle source
# File lib/fluent/plugin/in_journal.rb, line 127
def yield_current_entry
  yield @journal.current_entry
  @pos_storage.put(:journal, @journal.cursor)
rescue Systemd::JournalError => e
  log.warn("Error reading from Journal: #{e.class}: #{e.message}")
end