class LogStash::Outputs::FileCloseable

File output.

Write events to files on disk. You can use fields from the event as parts of the filename.

Constants

JavaException

Public Instance Methods

receive(event) click to toggle source
# File lib/logstash/outputs/file_closeable.rb, line 79
def receive(event)
    return unless output?(event)

    @codec.encode(event)

    if event.include? "tags" and event["tags"].include?("eof")

        @files.each do |path, fd|
            if(File.basename(path) == File.basename(event['path']))
                fd.active = false
            end
        end

        close_stale_files(true) #means eof
        return
    end
end
register() click to toggle source
# File lib/logstash/outputs/file_closeable.rb, line 43
def register
    require "fileutils" # For mkdir_p

    @files = {}
    now = Time.now
    @last_flush_cycle = now
    @last_stale_cleanup_cycle = now
    flush_interval = @flush_interval.to_i
    @stale_cleanup_interval = 10

    @codec.on_event do |event|
        if @path
            path = event.sprintf(@path)
        else
            path = event["path"]
        end

        fd = open(path)

        if event.is_a? LogStash::Event and @message_format
            output = event.sprintf(@message_format)
        else
            output = event["message"]
        end

        @logger.debug("Writing output to file", :output => output, :path => path) if @logger.debug?

        fd.write(output)
        fd.write("\n")

        flush(fd)
        close_stale_files(false)
    end
end
teardown() click to toggle source
# File lib/logstash/outputs/file_closeable.rb, line 97
def teardown
    @logger.debug("Teardown: closing files") if @logger.debug?
    @files.each do |path, fd|
        begin
            fd.close
            @logger.debug("Closed file #{path}", :fd => fd) if @logger.debug?
        rescue Exception => e
            @logger.error("Excpetion while flushing and closing files.", :exception => e) if @logger.debug?
        end
    end
    finished
end

Private Instance Methods

close_stale_files(eof) click to toggle source

every 10 seconds or so (triggered by events, but if there are no events there's no point closing files anyway)

# File lib/logstash/outputs/file_closeable.rb, line 132
def close_stale_files(eof)
    now = Time.now
    return if now - @last_stale_cleanup_cycle < @stale_cleanup_interval and not eof

    @logger.info("Starting stale files cleanup cycle", :files => @files)
    inactive_files = @files.select { |path, fd| not fd.active }
    @logger.debug("%d stale files found" % inactive_files.count, :inactive_files => inactive_files) if @logger.debug?
    inactive_files.each do |path, fd|
      @logger.info("Closing file %s" % path)
      fd.close
      @files.delete(path)
    end
    # mark all files as inactive, a call to write will mark them as active again
    @files.each { |path, fd| fd.active = false }
    @last_stale_cleanup_cycle = now
end
flush(fd) click to toggle source
# File lib/logstash/outputs/file_closeable.rb, line 111
def flush(fd)
    if flush_interval > 0
        flush_pending_files
    else
        fd.flush
    end
end
flush_pending_files() click to toggle source

every flush_interval seconds or so (triggered by events, but if there are no events there's no point flushing files anyway)

# File lib/logstash/outputs/file_closeable.rb, line 120
def flush_pending_files
    return unless Time.now - @last_flush_cycle >= flush_interval

    @logger.debug("Starting flush cycle") if @logger.debug?
    @files.each do |path, fd|
        @logger.debug("Flushing file", :path => path, :fd => fd) if @logger.debug?
        fd.flush
    end
    @last_flush_cycle = Time.now
end
open(path) click to toggle source
# File lib/logstash/outputs/file_closeable.rb, line 149
def open(path)
    return @files[path] if @files.include?(path) and not @files[path].nil?

    @logger.info("Opening file", :path => path)

    dir = File.dirname(path)
    if !Dir.exists?(dir)
        @logger.info("Creating directory", :directory => dir)
        FileUtils.mkdir_p(dir)
    end

    # work around a bug opening fifos (bug JRUBY-6280)
    stat = File.stat(path) rescue nil
    if stat and stat.ftype == "fifo" and RUBY_PLATFORM == "java"
        fd = java.io.FileWriter.new(java.io.File.new(path))
    else
        fd = File.new(path, "a")
    end
    if gzip
        fd = Zlib::GzipWriter.new(fd)
    end
    @files[path] = IOWriter.new(fd)
end