class LogStash::Inputs::Journald

Pull events from a local systemd journal.

See requirements github.com/ledbettj/systemd-journal

Public Instance Methods

close() click to toggle source
# File lib/logstash/inputs/journald.rb, line 143
def close # FIXME: doesn't really seem to work...
    @logger.debug("journald shutting down.")
    @journal = nil
    Thread.kill(@sincedb_writer)
    # Write current cursor
    file = File.open(@sincedb_path, 'w+')
    file.puts @cursor
    file.close
    @cursor = nil
end
register() click to toggle source
# File lib/logstash/inputs/journald.rb, line 61
def register
    opts = {
        flags: @flags,
        path: @path,
    }
    @hostname = Socket.gethostname
    @journal = Systemd::Journal.new(opts)
    @cursor = ""
    @written_cursor = ""
    @cursor_lock = Mutex.new
    if @thisboot
        @filter[:_boot_id] = Systemd::Id128.boot_id
    end
    if @sincedb_path.nil?
        if ENV["SINCEDB_DIR"].nil? && ENV["HOME"].nil?
            @logger.error("No SINCEDB_DIR or HOME environment variable set, I don't know where " \
                          "to keep track of the files I'm watching. Either set " \
                          "HOME or SINCEDB_DIR in your environment, or set sincedb_path in " \
                          "in your Logstash config for the file input with " \
                          "path '#{@path.inspect}'")
            raise(LogStash::ConfigurationError, "Sincedb can not be created.")
        end
        sincedb_dir = ENV["SINCEDB_DIR"] || ENV["HOME"]
        @sincedb_path = File.join(sincedb_dir, ".sincedb_journal")
        @logger.info("No sincedb_path set, generating one for the journal",
                     :sincedb_path => @sincedb_path)
    end
    # (Create and) read sincedb
    FileUtils.touch(@sincedb_path)
    @cursor = IO.read(@sincedb_path)
    # Write sincedb in thread
    @sincedb_writer = Thread.new do
        loop do
            sleep @sincedb_write_interval
            if @cursor != @written_cursor
                file = File.open(@sincedb_path, 'w+')
                file.puts @cursor
                file.close
                @cursor_lock.synchronize {
                    @written_cursor = @cursor
                }
             end
        end
    end
end
run(queue) click to toggle source
# File lib/logstash/inputs/journald.rb, line 107
def run(queue)
    if @cursor.strip.length == 0
        @journal.seek(@seekto.to_sym)

        # We must make one movement in order for the journal C api or else
        # the @journal.watch call will start from the beginning of the
        # journal. see:
        # https://github.com/ledbettj/systemd-journal/issues/55
        if @seekto == 'tail'
          @journal.move_previous
        end

        @journal.filter(@filter)
    else
        @journal.seek(@cursor)
        @journal.move_next # Without this, the last event will be read again
    end

    watch_journal do |entry|
        timestamp = entry.realtime_timestamp
        event = LogStash::Event.new(
            entry.to_h_lower(@lowercase).merge(
                "@timestamp" => timestamp,
                "host" => entry._hostname || @hostname,
                "cursor" => @journal.cursor
            )
        )
        decorate(event)
        queue << event
        @cursor_lock.synchronize {
            @cursor = @journal.cursor
        }
    end
end

Private Instance Methods

watch_journal() { |current_entry while !stop? && move_next| ... } click to toggle source
# File lib/logstash/inputs/journald.rb, line 155
def watch_journal
    until stop?
        if @journal.wait(@wait_timeout)
            yield @journal.current_entry while !stop? && @journal.move_next
        end
    end
end