class LogStash::Outputs::File
This output writes events to files on disk. You can use fields from the event as parts of the filename and/or path.
By default, this output writes one event per line in json format. You can customise the line format using the `line` codec like
- source,ruby
-
output {
file { path => ... codec => line { format => "custom format: %{message}"} }
}
Constants
- FIELD_REF
Attributes
failure_path[R]
Public Instance Methods
close()
click to toggle source
# File lib/logstash/outputs/file.rb, line 135 def close @flusher.stop unless @flusher.nil? @io_mutex.synchronize do @logger.debug("Close: closing files") @files.each do |path, fd| begin fd.close @logger.debug("Closed file #{path}", :fd => fd) rescue Exception => e @logger.error("Exception while flushing and closing files.", :exception => e) end end end end
multi_receive_encoded(events_and_encoded)
click to toggle source
# File lib/logstash/outputs/file.rb, line 109 def multi_receive_encoded(events_and_encoded) encoded_by_path = Hash.new {|h,k| h[k] = []} events_and_encoded.each do |event,encoded| file_output_path = event_path(event) encoded_by_path[file_output_path] << encoded end @io_mutex.synchronize do encoded_by_path.each do |path,chunks| fd = open(path) if @write_behavior == "overwrite" fd.truncate(0) fd.seek(0, IO::SEEK_SET) fd.write(chunks.last) else # append to the file chunks.each {|chunk| fd.write(chunk) } end fd.flush unless @flusher && @flusher.alive? end close_stale_files end end
register()
click to toggle source
# File lib/logstash/outputs/file.rb, line 84 def register require "fileutils" # For mkdir_p @files = {} @io_mutex = Mutex.new @path = File.expand_path(path) validate_path if path_with_field_ref? @file_root = extract_file_root else @file_root = File.dirname(path) end @failure_path = File.join(@file_root, @filename_failure) @flush_interval = @flush_interval.to_i if @flush_interval > 0 @flusher = Interval.start(@flush_interval, -> { flush_pending_files }) end @last_stale_cleanup_cycle = Time.now end
Private Instance Methods
cached?(path)
click to toggle source
# File lib/logstash/outputs/file.rb, line 234 def cached?(path) @files.include?(path) && !@files[path].nil? end
close_stale_files()
click to toggle source
every 10 seconds or so (triggered by events, but if there are no events there's no point closing files anyway)
# File lib/logstash/outputs/file.rb, line 217 def close_stale_files now = Time.now return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval @logger.debug("Starting stale files cleanup cycle", :files => @files) inactive_files = @files.select { |path, fd| not fd.active } @logger.debug("%d stale files found" % inactive_files.count, :inactive_files => inactive_files) inactive_files.each do |path, fd| @logger.info("Closing file %s" % path) fd.close @files.delete(path) end # mark all files as inactive, a call to write will mark them as active again @files.each { |path, fd| fd.active = false } @last_stale_cleanup_cycle = now end
deleted?(path)
click to toggle source
# File lib/logstash/outputs/file.rb, line 238 def deleted?(path) !File.exist?(path) end
event_path(event)
click to toggle source
# File lib/logstash/outputs/file.rb, line 175 def event_path(event) file_output_path = generate_filepath(event) if path_with_field_ref? && !inside_file_root?(file_output_path) @logger.warn("File: the event tried to write outside the files root, writing the event to the failure file", :event => event, :filename => @failure_path) file_output_path = @failure_path elsif !@create_if_deleted && deleted?(file_output_path) file_output_path = @failure_path end @logger.debug("File, writing event to file.", :filename => file_output_path) file_output_path end
extract_file_root()
click to toggle source
# File lib/logstash/outputs/file.rb, line 196 def extract_file_root parts = File.expand_path(path).split(File::SEPARATOR) parts.take_while { |part| part !~ FIELD_REF }.join(File::SEPARATOR) end
flush_pending_files()
click to toggle source
the back-bone of @flusher, our periodic-flushing interval.
# File lib/logstash/outputs/file.rb, line 202 def flush_pending_files @io_mutex.synchronize do @logger.debug("Starting flush cycle") @files.each do |path, fd| @logger.debug("Flushing file", :path => path, :fd => fd) fd.flush end end rescue => e # squash exceptions caught while flushing after logging them @logger.error("Exception flushing files", :exception => e.message, :backtrace => e.backtrace) end
generate_filepath(event)
click to toggle source
# File lib/logstash/outputs/file.rb, line 188 def generate_filepath(event) event.sprintf(@path) end
inside_file_root?(log_path)
click to toggle source
# File lib/logstash/outputs/file.rb, line 170 def inside_file_root?(log_path) target_file = File.expand_path(log_path) return target_file.start_with?("#{@file_root.to_s}/") end
open(path)
click to toggle source
# File lib/logstash/outputs/file.rb, line 242 def open(path) if !deleted?(path) && cached?(path) return @files[path] end if deleted?(path) if @create_if_deleted @logger.debug("Required path was deleted, creating the file again", :path => path) @files.delete(path) else return @files[path] if cached?(path) end end @logger.info("Opening file", :path => path) dir = File.dirname(path) if !Dir.exist?(dir) @logger.info("Creating directory", :directory => dir) if @dir_mode != -1 FileUtils.mkdir_p(dir, :mode => @dir_mode) else FileUtils.mkdir_p(dir) end end # work around a bug opening fifos (bug JRUBY-6280) stat = File.stat(path) rescue nil if stat && stat.ftype == "fifo" fd = java.io.FileWriter.new(java.io.File.new(path)) else if @file_mode != -1 fd = File.new(path, "a+", @file_mode) else fd = File.new(path, "a+") end end if gzip fd = Zlib::GzipWriter.new(fd) end @files[path] = IOWriter.new(fd) end
path_with_field_ref?()
click to toggle source
# File lib/logstash/outputs/file.rb, line 192 def path_with_field_ref? path =~ FIELD_REF end
root_directory()
click to toggle source
# File lib/logstash/outputs/file.rb, line 160 def root_directory parts = @path.split(File::SEPARATOR).select { |item| !item.empty? } if Gem.win_platform? # First part is the drive letter parts[1] else parts.first end end
validate_path()
click to toggle source
# File lib/logstash/outputs/file.rb, line 153 def validate_path if (root_directory =~ FIELD_REF) != nil @logger.error("File: The starting part of the path should not be dynamic.", :path => @path) raise LogStash::ConfigurationError.new("The starting part of the path should not be dynamic.") end end