class LogStash::Outputs::FileCloseable
File output.
Write events to files on disk. You can use fields from the event as parts of the filename.
Constants
- JavaException
Public Instance Methods
receive(event)
click to toggle source
# File lib/logstash/outputs/file_closeable.rb, line 79 def receive(event) return unless output?(event) @codec.encode(event) if event.include? "tags" and event["tags"].include?("eof") @files.each do |path, fd| if(File.basename(path) == File.basename(event['path'])) fd.active = false end end close_stale_files(true) #means eof return end end
register()
click to toggle source
# File lib/logstash/outputs/file_closeable.rb, line 43 def register require "fileutils" # For mkdir_p @files = {} now = Time.now @last_flush_cycle = now @last_stale_cleanup_cycle = now flush_interval = @flush_interval.to_i @stale_cleanup_interval = 10 @codec.on_event do |event| if @path path = event.sprintf(@path) else path = event["path"] end fd = open(path) if event.is_a? LogStash::Event and @message_format output = event.sprintf(@message_format) else output = event["message"] end @logger.debug("Writing output to file", :output => output, :path => path) if @logger.debug? fd.write(output) fd.write("\n") flush(fd) close_stale_files(false) end end
teardown()
click to toggle source
# File lib/logstash/outputs/file_closeable.rb, line 97 def teardown @logger.debug("Teardown: closing files") if @logger.debug? @files.each do |path, fd| begin fd.close @logger.debug("Closed file #{path}", :fd => fd) if @logger.debug? rescue Exception => e @logger.error("Excpetion while flushing and closing files.", :exception => e) if @logger.debug? end end finished end
Private Instance Methods
close_stale_files(eof)
click to toggle source
every 10 seconds or so (triggered by events, but if there are no events there's no point closing files anyway)
# File lib/logstash/outputs/file_closeable.rb, line 132 def close_stale_files(eof) now = Time.now return if now - @last_stale_cleanup_cycle < @stale_cleanup_interval and not eof @logger.info("Starting stale files cleanup cycle", :files => @files) inactive_files = @files.select { |path, fd| not fd.active } @logger.debug("%d stale files found" % inactive_files.count, :inactive_files => inactive_files) if @logger.debug? inactive_files.each do |path, fd| @logger.info("Closing file %s" % path) fd.close @files.delete(path) end # mark all files as inactive, a call to write will mark them as active again @files.each { |path, fd| fd.active = false } @last_stale_cleanup_cycle = now end
flush(fd)
click to toggle source
# File lib/logstash/outputs/file_closeable.rb, line 111 def flush(fd) if flush_interval > 0 flush_pending_files else fd.flush end end
flush_pending_files()
click to toggle source
every flush_interval seconds or so (triggered by events, but if there are no events there's no point flushing files anyway)
# File lib/logstash/outputs/file_closeable.rb, line 120 def flush_pending_files return unless Time.now - @last_flush_cycle >= flush_interval @logger.debug("Starting flush cycle") if @logger.debug? @files.each do |path, fd| @logger.debug("Flushing file", :path => path, :fd => fd) if @logger.debug? fd.flush end @last_flush_cycle = Time.now end
open(path)
click to toggle source
# File lib/logstash/outputs/file_closeable.rb, line 149 def open(path) return @files[path] if @files.include?(path) and not @files[path].nil? @logger.info("Opening file", :path => path) dir = File.dirname(path) if !Dir.exists?(dir) @logger.info("Creating directory", :directory => dir) FileUtils.mkdir_p(dir) end # work around a bug opening fifos (bug JRUBY-6280) stat = File.stat(path) rescue nil if stat and stat.ftype == "fifo" and RUBY_PLATFORM == "java" fd = java.io.FileWriter.new(java.io.File.new(path)) else fd = File.new(path, "a") end if gzip fd = Zlib::GzipWriter.new(fd) end @files[path] = IOWriter.new(fd) end