class LogStash::Inputs::File

Attributes

watcher[R]

@private used in specs

Public Class Methods

old_validate_value(value, validator)
Alias for: validate_value
validate_value(value, validator) click to toggle source
# File lib/logstash/inputs/file.rb, line 244
def validate_value(value, validator)
  if validator.is_a?(Array) && validator.size == 2 && validator.first.respond_to?(:call)
    callable, units = *validator
    # returns a ValidatedStruct having a `to_a` method suitable to return to the config mixin caller
    return callable.call(value, units).to_a
  end
  old_validate_value(value, validator)
end
Also aliased as: old_validate_value

Public Instance Methods

completely_stopped?() click to toggle source
# File lib/logstash/inputs/file.rb, line 336
def completely_stopped?
  # to synchronise after(:each) blocks in tests that remove the sincedb file before atomic_write completes
  @completely_stopped.true?
end
handle_deletable_path(path) click to toggle source
# File lib/logstash/inputs/file.rb, line 386
def handle_deletable_path(path)
  return if tail_mode?
  return if @completed_file_handlers.empty?
  @logger.debug? && @logger.debug(__method__.to_s, :path => path)
  @completed_file_handlers.each { |handler| handler.handle(path) }
end
listener_for(path) click to toggle source

The WatchedFile calls back here as ‘observer.listener_for(@path)` @param [String] path the identity

# File lib/logstash/inputs/file.rb, line 343
def listener_for(path)
  FileListener.new(path, self)
end
log_line_received(path, line) click to toggle source
# File lib/logstash/inputs/file.rb, line 393
def log_line_received(path, line)
  @logger.debug? && @logger.debug("Received line", :path => path, :text => line)
end
post_process_this(event, path) click to toggle source
# File lib/logstash/inputs/file.rb, line 376
def post_process_this(event, path)
  event.set("[@metadata][path]", path)
  event.set("[@metadata][host]", @host)
  attempt_set(event, @source_host_field, @host)
  attempt_set(event, @source_path_field, path) if path

  decorate(event)
  @queue.get << event
end
queue() click to toggle source

@private used in specs

# File lib/logstash/inputs/file.rb, line 405
def queue
  @queue.get
end
register() click to toggle source
# File lib/logstash/inputs/file.rb, line 257
def register
  require "addressable/uri"
  require "digest/md5"
  @logger.trace("Registering file input", :path => @path)
  @host = Socket.gethostname.force_encoding(Encoding::UTF_8)
  # This check is Logstash 5 specific.  If the class does not exist, and it
  # won't in older versions of Logstash, then we need to set it to nil.
  settings = defined?(LogStash::SETTINGS) ? LogStash::SETTINGS : nil

  @filewatch_config = {
    :exclude => @exclude,
    :stat_interval => @stat_interval,
    :discover_interval => @discover_interval,
    :sincedb_write_interval => @sincedb_write_interval,
    :delimiter => @delimiter,
    :ignore_older => @ignore_older,
    :close_older => @close_older,
    :max_open_files => @max_open_files,
    :sincedb_clean_after => @sincedb_clean_after,
    :file_chunk_count => @file_chunk_count,
    :file_chunk_size => @file_chunk_size,
    :file_sort_by => @file_sort_by,
    :file_sort_direction => @file_sort_direction,
    :exit_after_read => @exit_after_read,
    :check_archive_validity => @check_archive_validity,
  }

  @path.each do |path|
    if Pathname.new(path).relative?
      raise ArgumentError.new("File paths must be absolute, relative path specified: #{path}")
    end
  end

  if @sincedb_path.nil?
    base_sincedb_path = build_sincedb_base_from_settings(settings) || build_sincedb_base_from_env
    @sincedb_path = build_random_sincedb_filename(base_sincedb_path)
    @logger.info('No sincedb_path set, generating one based on the "path" setting', :sincedb_path => @sincedb_path.to_s, :path => @path)
  else
    @sincedb_path = Pathname.new(@sincedb_path)
    if @sincedb_path.directory?
      raise ArgumentError.new("The \"sincedb_path\" argument must point to a file, received a directory: \"#{@sincedb_path}\"")
    end
  end
  
  @filewatch_config[:sincedb_path] = @sincedb_path

  @filewatch_config[:start_new_files_at] = @start_position.to_sym

  if @file_completed_action.include?('log')
    if @file_completed_log_path.nil?
      raise ArgumentError.new('The "file_completed_log_path" setting must be provided when the "file_completed_action" is set to "log" or "log_and_delete"')
    else
      @file_completed_log_path = Pathname.new(@file_completed_log_path)
      unless @file_completed_log_path.exist?
        begin
          FileUtils.touch(@file_completed_log_path)
        rescue
          raise ArgumentError.new("The \"file_completed_log_path\" file can't be created: #{@file_completed_log_path}")
        end
      end
    end
  end

  if tail_mode?
    if @exit_after_read
      raise ArgumentError.new('The "exit_after_read" setting only works when the "mode" is set to "read"')
    end
    @watcher_class = FileWatch::ObservingTail
  else
    @watcher_class = FileWatch::ObservingRead
  end
  @codec = LogStash::Codecs::IdentityMapCodec.new(@codec)
  @completely_stopped = Concurrent::AtomicBoolean.new
  @queue = Concurrent::AtomicReference.new

  @source_host_field = ecs_select[disabled: 'host', v1:'[host][name]']
  @source_path_field = ecs_select[disabled: 'path', v1:'[log][file][path]']
end
run(queue) click to toggle source
# File lib/logstash/inputs/file.rb, line 367
def run(queue)
  start_processing
  @queue.set queue
  @watcher.subscribe(self) # halts here until quit is called
  # last action of the subscribe call is to write the sincedb
  exit_flush
  @completely_stopped.make_true
end
start_processing() click to toggle source
# File lib/logstash/inputs/file.rb, line 347
def start_processing
  # if the pipeline restarts this input,
  # make sure previous files are closed
  stop

  @watcher = @watcher_class.new(@filewatch_config)

  @completed_file_handlers = []
  if read_mode?
    if @file_completed_action.include?('log')
      @completed_file_handlers << LogCompletedFileHandler.new(@file_completed_log_path)
    end
    if @file_completed_action.include?('delete')
      @completed_file_handlers << DeleteCompletedFileHandler.new(@watcher.watch)
    end
  end

  @path.each { |path| @watcher.watch_this(path) }
end
stop() click to toggle source
# File lib/logstash/inputs/file.rb, line 397
def stop
  unless @watcher.nil?
    @codec.close
    @watcher.quit
  end
end

Private Instance Methods

attempt_set(event, field_reference, value) click to toggle source

Attempt to set an event’s field to the provided value without overwriting an existing value or producing an error

# File lib/logstash/inputs/file.rb, line 421
def attempt_set(event, field_reference, value)
  return false if event.include?(field_reference)

  event.set(field_reference, value)
rescue => e
  logger.trace("failed to set #{field_reference} to `#{value}`", :exception => e.message)
  false
end
build_random_sincedb_filename(pathname) click to toggle source
# File lib/logstash/inputs/file.rb, line 444
def build_random_sincedb_filename(pathname)
  # Join by ',' to make it easy for folks to know their own sincedb
  # generated path (vs, say, inspecting the @path array)
  pathname.join(".sincedb_" + Digest::MD5.hexdigest(@path.join(",")))
end
build_sincedb_base_from_env() click to toggle source
# File lib/logstash/inputs/file.rb, line 430
def build_sincedb_base_from_env
  # This section is going to be deprecated eventually, as path.data will be
  # the default, not an environment variable (SINCEDB_DIR or LOGSTASH_HOME)
  if ENV["SINCEDB_DIR"].nil? && ENV["LOGSTASH_HOME"].nil?
    @logger.error("No SINCEDB_DIR or LOGSTASH_HOME environment variable set, I don't know where " \
                  "to keep track of the files I'm watching. Either set " \
                  "LOGSTASH_HOME or SINCEDB_DIR in your environment, or set sincedb_path in " \
                  "in your Logstash config for the file input with " \
                  "path '#{@path.inspect}'")
    raise ArgumentError.new('The "sincedb_path" setting was not given and the environment variables "SINCEDB_DIR" or "LOGSTASH_HOME" are not set so we cannot build a file path for the sincedb')
  end
  Pathname.new(ENV["SINCEDB_DIR"] || ENV["LOGSTASH_HOME"])
end
build_sincedb_base_from_settings(settings) click to toggle source
# File lib/logstash/inputs/file.rb, line 411
def build_sincedb_base_from_settings(settings)
  logstash_data_path = settings.get_value("path.data")
  Pathname.new(logstash_data_path).join("plugins", "inputs", "file").tap do |path|
    # Ensure that the filepath exists before writing, since it's deeply nested.
    path.mkpath
  end
end
exit_flush() click to toggle source
# File lib/logstash/inputs/file.rb, line 458
def exit_flush
  listener = FlushableListener.new("none", self)
  if @codec.identity_count.zero?
    # using the base codec without identity/path info
    @codec.base_codec.flush do |event|
      begin
        listener.process_event(event)
      rescue => e
        @logger.error("File Input: flush on exit downstream error", :exception => e)
      end
    end
  else
    @codec.flush_mapped(listener)
  end
end
read_mode?() click to toggle source
# File lib/logstash/inputs/file.rb, line 454
def read_mode?
  !tail_mode?
end
tail_mode?() click to toggle source
# File lib/logstash/inputs/file.rb, line 450
def tail_mode?
  @mode == "tail"
end