class LogStash::Inputs::File
Attributes
watcher[R]
@private used in specs
Public Class Methods
validate_value(value, validator)
click to toggle source
# File lib/logstash/inputs/file.rb, line 244 def validate_value(value, validator) if validator.is_a?(Array) && validator.size == 2 && validator.first.respond_to?(:call) callable, units = *validator # returns a ValidatedStruct having a `to_a` method suitable to return to the config mixin caller return callable.call(value, units).to_a end old_validate_value(value, validator) end
Also aliased as: old_validate_value
Public Instance Methods
completely_stopped?()
click to toggle source
# File lib/logstash/inputs/file.rb, line 336 def completely_stopped? # to synchronise after(:each) blocks in tests that remove the sincedb file before atomic_write completes @completely_stopped.true? end
handle_deletable_path(path)
click to toggle source
# File lib/logstash/inputs/file.rb, line 386 def handle_deletable_path(path) return if tail_mode? return if @completed_file_handlers.empty? @logger.debug? && @logger.debug(__method__.to_s, :path => path) @completed_file_handlers.each { |handler| handler.handle(path) } end
listener_for(path)
click to toggle source
The WatchedFile calls back here as ‘observer.listener_for(@path)` @param [String] path the identity
# File lib/logstash/inputs/file.rb, line 343 def listener_for(path) FileListener.new(path, self) end
log_line_received(path, line)
click to toggle source
# File lib/logstash/inputs/file.rb, line 393 def log_line_received(path, line) @logger.debug? && @logger.debug("Received line", :path => path, :text => line) end
post_process_this(event, path)
click to toggle source
# File lib/logstash/inputs/file.rb, line 376 def post_process_this(event, path) event.set("[@metadata][path]", path) event.set("[@metadata][host]", @host) attempt_set(event, @source_host_field, @host) attempt_set(event, @source_path_field, path) if path decorate(event) @queue.get << event end
queue()
click to toggle source
@private used in specs
# File lib/logstash/inputs/file.rb, line 405 def queue @queue.get end
register()
click to toggle source
# File lib/logstash/inputs/file.rb, line 257 def register require "addressable/uri" require "digest/md5" @logger.trace("Registering file input", :path => @path) @host = Socket.gethostname.force_encoding(Encoding::UTF_8) # This check is Logstash 5 specific. If the class does not exist, and it # won't in older versions of Logstash, then we need to set it to nil. settings = defined?(LogStash::SETTINGS) ? LogStash::SETTINGS : nil @filewatch_config = { :exclude => @exclude, :stat_interval => @stat_interval, :discover_interval => @discover_interval, :sincedb_write_interval => @sincedb_write_interval, :delimiter => @delimiter, :ignore_older => @ignore_older, :close_older => @close_older, :max_open_files => @max_open_files, :sincedb_clean_after => @sincedb_clean_after, :file_chunk_count => @file_chunk_count, :file_chunk_size => @file_chunk_size, :file_sort_by => @file_sort_by, :file_sort_direction => @file_sort_direction, :exit_after_read => @exit_after_read, :check_archive_validity => @check_archive_validity, } @path.each do |path| if Pathname.new(path).relative? raise ArgumentError.new("File paths must be absolute, relative path specified: #{path}") end end if @sincedb_path.nil? base_sincedb_path = build_sincedb_base_from_settings(settings) || build_sincedb_base_from_env @sincedb_path = build_random_sincedb_filename(base_sincedb_path) @logger.info('No sincedb_path set, generating one based on the "path" setting', :sincedb_path => @sincedb_path.to_s, :path => @path) else @sincedb_path = Pathname.new(@sincedb_path) if @sincedb_path.directory? raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"") end end @filewatch_config[:sincedb_path] = @sincedb_path @filewatch_config[:start_new_files_at] = @start_position.to_sym if @file_completed_action.include?('log') if @file_completed_log_path.nil? raise ArgumentError.new('The "file_completed_log_path" setting must be provided when the "file_completed_action" is set to "log" or "log_and_delete"') else @file_completed_log_path = Pathname.new(@file_completed_log_path) unless @file_completed_log_path.exist? begin FileUtils.touch(@file_completed_log_path) rescue raise ArgumentError.new("The \"file_completed_log_path\" file can't be created: #{@file_completed_log_path}") end end end end if tail_mode? if @exit_after_read raise ArgumentError.new('The "exit_after_read" setting only works when the "mode" is set to "read"') end @watcher_class = FileWatch::ObservingTail else @watcher_class = FileWatch::ObservingRead end @codec = LogStash::Codecs::IdentityMapCodec.new(@codec) @completely_stopped = Concurrent::AtomicBoolean.new @queue = Concurrent::AtomicReference.new @source_host_field = ecs_select[disabled: 'host', v1:'[host][name]'] @source_path_field = ecs_select[disabled: 'path', v1:'[log][file][path]'] end
run(queue)
click to toggle source
# File lib/logstash/inputs/file.rb, line 367 def run(queue) start_processing @queue.set queue @watcher.subscribe(self) # halts here until quit is called # last action of the subscribe call is to write the sincedb exit_flush @completely_stopped.make_true end
start_processing()
click to toggle source
# File lib/logstash/inputs/file.rb, line 347 def start_processing # if the pipeline restarts this input, # make sure previous files are closed stop @watcher = @watcher_class.new(@filewatch_config) @completed_file_handlers = [] if read_mode? if @file_completed_action.include?('log') @completed_file_handlers << LogCompletedFileHandler.new(@file_completed_log_path) end if @file_completed_action.include?('delete') @completed_file_handlers << DeleteCompletedFileHandler.new(@watcher.watch) end end @path.each { |path| @watcher.watch_this(path) } end
stop()
click to toggle source
# File lib/logstash/inputs/file.rb, line 397 def stop unless @watcher.nil? @codec.close @watcher.quit end end
Private Instance Methods
attempt_set(event, field_reference, value)
click to toggle source
Attempt to set an event’s field to the provided value without overwriting an existing value or producing an error
# File lib/logstash/inputs/file.rb, line 421 def attempt_set(event, field_reference, value) return false if event.include?(field_reference) event.set(field_reference, value) rescue => e logger.trace("failed to set #{field_reference} to `#{value}`", :exception => e.message) false end
build_random_sincedb_filename(pathname)
click to toggle source
# File lib/logstash/inputs/file.rb, line 444 def build_random_sincedb_filename(pathname) # Join by ',' to make it easy for folks to know their own sincedb # generated path (vs, say, inspecting the @path array) pathname.join(".sincedb_" + Digest::MD5.hexdigest(@path.join(","))) end
build_sincedb_base_from_env()
click to toggle source
# File lib/logstash/inputs/file.rb, line 430 def build_sincedb_base_from_env # This section is going to be deprecated eventually, as path.data will be # the default, not an environment variable (SINCEDB_DIR or LOGSTASH_HOME) if ENV["SINCEDB_DIR"].nil? && ENV["LOGSTASH_HOME"].nil? @logger.error("No SINCEDB_DIR or LOGSTASH_HOME environment variable set, I don't know where " \ "to keep track of the files I'm watching. Either set " \ "LOGSTASH_HOME or SINCEDB_DIR in your environment, or set sincedb_path in " \ "in your Logstash config for the file input with " \ "path '#{@path.inspect}'") raise ArgumentError.new('The "sincedb_path" setting was not given and the environment variables "SINCEDB_DIR" or "LOGSTASH_HOME" are not set so we cannot build a file path for the sincedb') end Pathname.new(ENV["SINCEDB_DIR"] || ENV["LOGSTASH_HOME"]) end
build_sincedb_base_from_settings(settings)
click to toggle source
# File lib/logstash/inputs/file.rb, line 411 def build_sincedb_base_from_settings(settings) logstash_data_path = settings.get_value("path.data") Pathname.new(logstash_data_path).join("plugins", "inputs", "file").tap do |path| # Ensure that the filepath exists before writing, since it's deeply nested. path.mkpath end end
exit_flush()
click to toggle source
# File lib/logstash/inputs/file.rb, line 458 def exit_flush listener = FlushableListener.new("none", self) if @codec.identity_count.zero? # using the base codec without identity/path info @codec.base_codec.flush do |event| begin listener.process_event(event) rescue => e @logger.error("File Input: flush on exit downstream error", :exception => e) end end else @codec.flush_mapped(listener) end end
read_mode?()
click to toggle source
# File lib/logstash/inputs/file.rb, line 454 def read_mode? !tail_mode? end
tail_mode?()
click to toggle source
# File lib/logstash/inputs/file.rb, line 450 def tail_mode? @mode == "tail" end