class Fluent::Plugin::FileBuffer
Constants
- DEFAULT_CHUNK_LIMIT_SIZE
- DEFAULT_TOTAL_LIMIT_SIZE
- DIR_PERMISSION
Public Class Methods
new()
click to toggle source
Calls superclass method
Fluent::Plugin::Buffer::new
# File lib/fluent/plugin/buf_file.rb, line 46 def initialize super @symlink_path = nil @multi_workers_available = false @additional_resume_path = nil end
Public Instance Methods
buffer_path_for_test?()
click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 117 def buffer_path_for_test? caller_locations.each do |location| # Thread::Backtrace::Location#path returns base filename or absolute path. # #absolute_path returns absolute_path always. # https://bugs.ruby-lang.org/issues/12159 if location.absolute_path =~ /\/test_[^\/]+\.rb$/ # location.path =~ /test_.+\.rb$/ return true end end false end
configure(conf)
click to toggle source
Calls superclass method
Fluent::Plugin::Buffer#configure
# File lib/fluent/plugin/buf_file.rb, line 53 def configure(conf) super multi_workers_configured = owner.system_config.workers > 1 ? true : false using_plugin_root_dir = false unless @path if root_dir = owner.plugin_root_dir @path = File.join(root_dir, 'buffer') using_plugin_root_dir = true # plugin_root_dir path contains worker id else raise Fluent::ConfigError, "buffer path is not configured. specify 'path' in <buffer>" end end type_of_owner = Plugin.lookup_type_from_class(@_owner.class) if @@buffer_paths.has_key?(@path) && !buffer_path_for_test? type_using_this_path = @@buffer_paths[@path] raise ConfigError, "Other '#{type_using_this_path}' plugin already use same buffer path: type = #{type_of_owner}, buffer path = #{@path}" end @@buffer_paths[@path] = type_of_owner specified_directory_exists = File.exist?(@path) && File.directory?(@path) unexisting_path_for_directory = !File.exist?(@path) && !@path.include?('.*') if specified_directory_exists || unexisting_path_for_directory # directory if using_plugin_root_dir || !multi_workers_configured @path = File.join(@path, 'buffer.*.log') else @path = File.join(@path, "worker#{fluentd_worker_id}", 'buffer.*.log') if fluentd_worker_id == 0 # worker 0 always checks unflushed buffer chunks to be resumed (might be created while non-multi-worker configuration) @additional_resume_path = File.join(File.expand_path("../../", @path), 'buffer.*.log') end end @multi_workers_available = true else # specified path is file path if File.basename(@path).include?('.*.') # valid file path elsif File.basename(@path).end_with?('.*') @path = @path + '.log' else # existing file will be ignored @path = @path + '.*.log' end @multi_workers_available = false end if @dir_permission @dir_permission = @dir_permission.to_i(8) if @dir_permission.is_a?(String) else @dir_permission = system_config.dir_permission || DIR_PERMISSION end end
generate_chunk(metadata)
click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 170 def generate_chunk(metadata) # FileChunk generates real path with unique_id if @file_permission Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, perm: @file_permission, compress: @compress) else Fluent::Plugin::Buffer::FileChunk.new(metadata, @path, :create, compress: @compress) end end
multi_workers_ready?()
click to toggle source
This method is called only when multi worker is configured
# File lib/fluent/plugin/buf_file.rb, line 110 def multi_workers_ready? unless @multi_workers_available log.error "file buffer with multi workers should be configured to use directory 'path', or system root_dir and plugin id" end @multi_workers_available end
persistent?()
click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 135 def persistent? true end
resume()
click to toggle source
# File lib/fluent/plugin/buf_file.rb, line 139 def resume stage = {} queue = [] patterns = [@path] patterns.unshift @additional_resume_path if @additional_resume_path Dir.glob(patterns) do |path| next unless File.file?(path) m = new_metadata() # this metadata will be overwritten by resuming .meta file content # so it should not added into @metadata_list for now mode = Fluent::Plugin::Buffer::FileChunk.assume_chunk_state(path) if mode == :unknown log.debug "uknown state chunk found", path: path next end chunk = Fluent::Plugin::Buffer::FileChunk.new(m, path, mode) # file chunk resumes contents of metadata case chunk.state when :staged stage[chunk.metadata] = chunk when :queued queue << chunk end end queue.sort_by!{ |chunk| chunk.modified_at } return stage, queue end
start()
click to toggle source
Calls superclass method
Fluent::Plugin::Buffer#start
# File lib/fluent/plugin/buf_file.rb, line 129 def start FileUtils.mkdir_p File.dirname(@path), mode: @dir_permission super end