class Fluent::Plugin::JournalInput
Fluentd plugin for reading from the systemd journal files
Constants
- DEFAULT_STORAGE_TYPE
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_journal.rb, line 38 def configure(conf) super @journal = nil @pos_storage = storage_create(usage: 'positions') @mutator = SystemdEntryMutator.new(**@entry_opts.to_h) @mutator.warnings.each { |warning| log.warn(warning) } end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_journal.rb, line 46 def start super timer_execute(:in_systemd_emit_worker, 1, &method(:run)) end
Private Instance Methods
emit(entry)
click to toggle source
# File lib/fluent/plugin/in_journal.rb, line 107 def emit(entry) router.emit(@tag, Fluent::EventTime.from_time(entry.realtime_timestamp), formatted(entry)) rescue Fluent::Plugin::Buffer::BufferOverflowError => e retries ||= 0 raise e if retries > 10 retries += 1 sleep 1.5**retries + rand(0..3) retry rescue => e # rubocop:disable Style/RescueStandardError log.error("Exception emitting record: #{e}") end
formatted(entry)
click to toggle source
# File lib/fluent/plugin/in_journal.rb, line 119 def formatted(entry) @mutator.run(entry) end
init_journal()
click to toggle source
# File lib/fluent/plugin/in_journal.rb, line 53 def init_journal # TODO: ruby 2.3 @journal_files = [] files = Dir.glob(@path) files.each do |f| @journal_files = @journal_files << f if f =~ @pattern end @journal.close if @journal # rubocop:disable Style/SafeNavigation @journal = Systemd::Journal.new(files: @journal_files) @journal.filter(*(@matches || @filters)) seek true rescue Systemd::JournalError => e log.warn("#{e.class}: #{e.message} retrying in 1s") false end
read_from()
click to toggle source
# File lib/fluent/plugin/in_journal.rb, line 95 def read_from @read_from_head ? :head : :tail end
run()
click to toggle source
# File lib/fluent/plugin/in_journal.rb, line 99 def run return unless @journal || init_journal init_journal if @journal.wait(0) == :invalidate watch do |entry| emit(entry) end end
seek()
click to toggle source
# File lib/fluent/plugin/in_journal.rb, line 70 def seek cursor = @pos_storage.get(:journal) seek_to(cursor || read_from) rescue Systemd::JournalError log.warn( "Could not seek to cursor #{cursor} found in position file: #{@pos_storage.path}, " \ "falling back to reading from #{read_from}" ) seek_to(read_from) end
seek_to(pos)
click to toggle source
according to github.com/ledbettj/systemd-journal/issues/64#issuecomment-271056644 and bugs.freedesktop.org/show_bug.cgi?id=64614, after doing a seek(:tail), you must move back in such a way that the next move_next will return the last record
# File lib/fluent/plugin/in_journal.rb, line 85 def seek_to(pos) @journal.seek(pos) return if pos == :head if pos == :tail @journal.move(-2) else @journal.move(1) end end
watch(&block)
click to toggle source
# File lib/fluent/plugin/in_journal.rb, line 123 def watch(&block) yield_current_entry(&block) while @journal.move_next end
yield_current_entry() { |current_entry| ... }
click to toggle source
# File lib/fluent/plugin/in_journal.rb, line 127 def yield_current_entry yield @journal.current_entry @pos_storage.put(:journal, @journal.cursor) rescue Systemd::JournalError => e log.warn("Error reading from Journal: #{e.class}: #{e.message}") end