class Fluent::Plugin::FileBuffer

Constants

DEFAULT_CHUNK_LIMIT_SIZE
DEFAULT_TOTAL_LIMIT_SIZE
DIR_PERMISSION

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::Plugin::Buffer::new
# File lib/fluent/plugin/buf_file.rb, line 46
def initialize
  super
  @symlink_path = nil
  @multi_workers_available = false
  @additional_resume_path = nil
end

Public Instance Methods

buffer_path_for_test?() click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 117
def buffer_path_for_test?
  caller_locations.each do |location|
    # Thread::Backtrace::Location#path returns base filename or absolute path.
    # #absolute_path returns absolute_path always.
    # https://bugs.ruby-lang.org/issues/12159
    if location.absolute_path =~ /\/test_[^\/]+\.rb$/ # location.path =~ /test_.+\.rb$/
      return true
    end
  end
  false
end
configure(conf) click to toggle source
Calls superclass method Fluent::Plugin::Buffer#configure
# File lib/fluent/plugin/buf_file.rb, line 53
def configure(conf)
  super

  multi_workers_configured = owner.system_config.workers > 1 ? true : false

  using_plugin_root_dir = false
  unless @path
    if root_dir = owner.plugin_root_dir
      @path = File.join(root_dir, 'buffer')
      using_plugin_root_dir = true # plugin_root_dir path contains worker id
    else
      raise Fluent::ConfigError, "buffer path is not configured. specify 'path' in <buffer>"
    end
  end

  type_of_owner = Plugin.lookup_type_from_class(@_owner.class)
  if @@buffer_paths.has_key?(@path) && !buffer_path_for_test?
    type_using_this_path = @@buffer_paths[@path]
    raise ConfigError, "Other '#{type_using_this_path}' plugin already use same buffer path: type = #{type_of_owner}, buffer path = #{@path}"
  end

  @@buffer_paths[@path] = type_of_owner

  specified_directory_exists = File.exist?(@path) && File.directory?(@path)
  unexisting_path_for_directory = !File.exist?(@path) && !@path.include?('.*')

  if specified_directory_exists || unexisting_path_for_directory # directory
    if using_plugin_root_dir || !multi_workers_configured
      @path = File.join(@path, 'buffer.*.log')
    else
      @path = File.join(@path, "worker#{fluentd_worker_id}", 'buffer.*.log')
      if fluentd_worker_id == 0
        # worker 0 always checks unflushed buffer chunks to be resumed (might be created while non-multi-worker configuration)
        @additional_resume_path = File.join(File.expand_path("../../", @path), 'buffer.*.log')
      end
    end
    @multi_workers_available = true
  else # specified path is file path
    if File.basename(@path).include?('.*.')
      # valid file path
    elsif File.basename(@path).end_with?('.*')
      @path = @path + '.log'
    else
      # existing file will be ignored
      @path = @path + '.*.log'
    end
    @multi_workers_available = false
  end

  if @dir_permission
    @dir_permission = @dir_permission.to_i(8) if @dir_permission.is_a?(String)
  else
    @dir_permission = system_config.dir_permission || DIR_PERMISSION
  end
end
generate_chunk(metadata) click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 170
def generate_chunk(metadata)
  # FileChunk generates real path with unique_id
  if @file_permission
    Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: @file_permission, compress: @compress)
  else
    Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, compress: @compress)
  end
end
multi_workers_ready?() click to toggle source

This method is called only when multi worker is configured

# File lib/fluent/plugin/buf_file.rb, line 110
def multi_workers_ready?
  unless @multi_workers_available
    log.error "file buffer with multi workers should be configured to use directory 'path', or system root_dir and plugin id"
  end
  @multi_workers_available
end
persistent?() click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 135
def persistent?
  true
end
resume() click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 139
def resume
  stage = {}
  queue = []

  patterns = [@path]
  patterns.unshift @additional_resume_path if @additional_resume_path
  Dir.glob(patterns) do |path|
    next unless File.file?(path)

    m = new_metadata() # this metadata will be overwritten by resuming .meta file content
                       # so it should not added into @metadata_list for now
    mode = Fluent::Plugin::Buffer::FileChunk.assume_chunk_state(path)
    if mode == :unknown
      log.debug "uknown state chunk found", path: path
      next
    end

    chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode) # file chunk resumes contents of metadata
    case chunk.state
    when :staged
      stage[chunk.metadata] = chunk
    when :queued
      queue << chunk
    end
  end

  queue.sort_by!{ |chunk| chunk.modified_at }

  return stage, queue
end
start() click to toggle source
Calls superclass method Fluent::Plugin::Buffer#start
# File lib/fluent/plugin/buf_file.rb, line 129
def start
  FileUtils.mkdir_p File.dirname(@path), mode: @dir_permission

  super
end