class LogStash::Inputs::Journald
Pull events from a local systemd journal.
See requirements github.com/ledbettj/systemd-journal
Public Instance Methods
close()
click to toggle source
# File lib/logstash/inputs/journald.rb, line 143 def close # FIXME: doesn't really seem to work... @logger.debug("journald shutting down.") @journal = nil Thread.kill(@sincedb_writer) # Write current cursor file = File.open(@sincedb_path, 'w+') file.puts @cursor file.close @cursor = nil end
register()
click to toggle source
# File lib/logstash/inputs/journald.rb, line 61 def register opts = { flags: @flags, path: @path, } @hostname = Socket.gethostname @journal = Systemd::Journal.new(opts) @cursor = "" @written_cursor = "" @cursor_lock = Mutex.new if @thisboot @filter[:_boot_id] = Systemd::Id128.boot_id end if @sincedb_path.nil? if ENV["SINCEDB_DIR"].nil? && ENV["HOME"].nil? @logger.error("No SINCEDB_DIR or HOME environment variable set, I don't know where " \ "to keep track of the files I'm watching. Either set " \ "HOME or SINCEDB_DIR in your environment, or set sincedb_path in " \ "in your Logstash config for the file input with " \ "path '#{@path.inspect}'") raise(LogStash::ConfigurationError, "Sincedb can not be created.") end sincedb_dir = ENV["SINCEDB_DIR"] || ENV["HOME"] @sincedb_path = File.join(sincedb_dir, ".sincedb_journal") @logger.info("No sincedb_path set, generating one for the journal", :sincedb_path => @sincedb_path) end # (Create and) read sincedb FileUtils.touch(@sincedb_path) @cursor = IO.read(@sincedb_path) # Write sincedb in thread @sincedb_writer = Thread.new do loop do sleep @sincedb_write_interval if @cursor != @written_cursor file = File.open(@sincedb_path, 'w+') file.puts @cursor file.close @cursor_lock.synchronize { @written_cursor = @cursor } end end end end
run(queue)
click to toggle source
# File lib/logstash/inputs/journald.rb, line 107 def run(queue) if @cursor.strip.length == 0 @journal.seek(@seekto.to_sym) # We must make one movement in order for the journal C api or else # the @journal.watch call will start from the beginning of the # journal. see: # https://github.com/ledbettj/systemd-journal/issues/55 if @seekto == 'tail' @journal.move_previous end @journal.filter(@filter) else @journal.seek(@cursor) @journal.move_next # Without this, the last event will be read again end watch_journal do |entry| timestamp = entry.realtime_timestamp event = LogStash::Event.new( entry.to_h_lower(@lowercase).merge( "@timestamp" => timestamp, "host" => entry._hostname || @hostname, "cursor" => @journal.cursor ) ) decorate(event) queue << event @cursor_lock.synchronize { @cursor = @journal.cursor } end end
Private Instance Methods
watch_journal() { |current_entry while !stop? && move_next| ... }
click to toggle source
# File lib/logstash/inputs/journald.rb, line 155 def watch_journal until stop? if @journal.wait(@wait_timeout) yield @journal.current_entry while !stop? && @journal.move_next end end end