class Fluent::Plugin::Buffer::FileChunk
Constants
- FILE_PERMISSION
Attributes
path[R]
permission[R]
Public Class Methods
assume_chunk_state(path)
click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 175 def self.assume_chunk_state(path) if /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n =~ path # //n switch means explicit 'ASCII-8BIT' pattern $1 == 'b' ? :staged : :queued else # files which matches to glob of buffer file pattern # it includes files which are created by out_file :unknown end end
generate_queued_chunk_path(path, unique_id)
click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 197 def self.generate_queued_chunk_path(path, unique_id) chunk_id = Fluent::UniqueId.hex(unique_id) if path.index(".b#{chunk_id}.") path.sub(".b#{chunk_id}.", ".q#{chunk_id}.") else # for unexpected cases (ex: users rename files while opened by fluentd) path + ".q#{chunk_id}.chunk" end end
generate_stage_chunk_path(path, unique_id)
click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 185 def self.generate_stage_chunk_path(path, unique_id) pos = path.index('.*.') raise "BUG: buffer chunk path on stage MUST have '.*.'" unless pos prefix = path[0...pos] suffix = path[(pos+3)..-1] chunk_id = Fluent::UniqueId.hex(unique_id) state = 'b' "#{prefix}.#{state}#{chunk_id}.#{suffix}" end
new(metadata, path, mode, perm: system_config.file_permission || FILE_PERMISSION, compress: :text)
click to toggle source
Calls superclass method
Fluent::Plugin::Buffer::Chunk::new
# File lib/fluent/plugin/buffer/file_chunk.rb, line 43 def initialize(metadata, path, mode, perm: system_config.file_permission || FILE_PERMISSION, compress: :text) super(metadata, compress: compress) @permission = perm.is_a?(String) ? perm.to_i(8) : perm @bytesize = @size = @adding_bytes = @adding_size = 0 @meta = nil case mode when :create then create_new_chunk(path, @permission) when :staged then load_existing_staged_chunk(path) when :queued then load_existing_enqueued_chunk(path) else raise ArgumentError, "Invalid file chunk mode: #{mode}" end end
unique_id_from_path(path)
click to toggle source
used only for queued v0.12 buffer path
# File lib/fluent/plugin/buffer/file_chunk.rb, line 207 def self.unique_id_from_path(path) if /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n =~ path # //n switch means explicit 'ASCII-8BIT' pattern return $2.scan(/../).map{|x| x.to_i(16) }.pack('C*') end nil end
Public Instance Methods
bytesize()
click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 89 def bytesize @bytesize + @adding_bytes end
close()
click to toggle source
Calls superclass method
Fluent::Plugin::Buffer::Chunk#close
# File lib/fluent/plugin/buffer/file_chunk.rb, line 145 def close super size = @chunk.size @chunk.close @meta.close if @meta # meta may be missing if chunk is queued at first if size == 0 File.unlink(@path, @meta_path) end end
commit()
click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 68 def commit write_metadata # this should be at first: of course, this operation may fail @commit_position = @chunk.pos @size += @adding_size @bytesize += @adding_bytes @adding_bytes = @adding_size = 0 @modified_at = Time.now true end
concat(bulk, bulk_size)
click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 58 def concat(bulk, bulk_size) raise "BUG: concatenating to unwritable chunk, now '#{self.state}'" unless self.writable? bulk.force_encoding(Encoding::ASCII_8BIT) @chunk.write bulk @adding_bytes += bulk.bytesize @adding_size += bulk_size true end
create_new_chunk(path, perm)
click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 269 def create_new_chunk(path, perm) @path = self.class.generate_stage_chunk_path(path, @unique_id) @meta_path = @path + '.meta' begin @chunk = File.open(@path, 'wb+', perm) @chunk.set_encoding(Encoding::ASCII_8BIT) @chunk.sync = true @chunk.binmode rescue => e # Here assumes "Too many open files" like recoverable error so raising BufferOverflowError. # If other cases are possible, we will change erorr handling with proper classes. raise BufferOverflowError, "can't create buffer file for #{path}. Stop creating buffer files: error = #{e}" end begin @meta = File.open(@meta_path, 'wb', perm) @meta.set_encoding(Encoding::ASCII_8BIT) @meta.sync = true @meta.binmode rescue => e # This case is easier than enqueued!. Just removing pre-create buffer file @chunk.close rescue nil File.unlink(@path) rescue nil # Same as @chunk case. See above raise BufferOverflowError, "can't create buffer metadata for #{path}. Stop creating buffer files: error = #{e}" end @state = :unstaged @bytesize = 0 @commit_position = @chunk.pos # must be 0 @adding_bytes = 0 @adding_size = 0 end
empty?()
click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 97 def empty? @bytesize == 0 end
enqueued!()
click to toggle source
Calls superclass method
Fluent::Plugin::Buffer::Chunk#enqueued!
# File lib/fluent/plugin/buffer/file_chunk.rb, line 101 def enqueued! return unless self.staged? new_chunk_path = self.class.generate_queued_chunk_path(@path, @unique_id) new_meta_path = new_chunk_path + '.meta' write_metadata(update: false) # re-write metadata w/ finalized records begin file_rename(@chunk, @path, new_chunk_path, ->(new_io) { @chunk = new_io }) rescue => e begin file_rename(@chunk, new_chunk_path, @path, ->(new_io) { @chunk = new_io }) if File.exist?(new_chunk_path) rescue => re # In this point, restore buffer state is hard because previous `file_rename` failed by resource problem. # Retry is one possible approach but it may cause livelock under limited resources or high load environment. # So we ignore such errors for now and log better message instead. # "Too many open files" should be fixed by proper buffer configuration and system setting. raise "can't enqueue buffer file and failed to restore. This may causes inconsistent state: path = #{@path}, error = '#{e}', retry error = '#{re}'" else raise "can't enqueue buffer file: path = #{@path}, error = '#{e}'" end end begin file_rename(@meta, @meta_path, new_meta_path, ->(new_io) { @meta = new_io }) rescue => e begin file_rename(@chunk, new_chunk_path, @path, ->(new_io) { @chunk = new_io }) if File.exist?(new_chunk_path) file_rename(@meta, new_meta_path, @meta_path, ->(new_io) { @meta = new_io }) if File.exist?(new_meta_path) rescue => re # See above raise "can't enqueue buffer metadata and failed to restore. This may causes inconsistent state: path = #{@meta_path}, error = '#{e}', retry error = '#{re}'" else raise "can't enqueue buffer metadata: path = #{@meta_path}, error = '#{e}'" end end @path = new_chunk_path @meta_path = new_meta_path super end
file_rename(file, old_path, new_path, callback=nil)
click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 252 def file_rename(file, old_path, new_path, callback=nil) pos = file.pos if Fluent.windows? file.close File.rename(old_path, new_path) file = File.open(new_path, 'rb', @permission) else File.rename(old_path, new_path) file.reopen(new_path, 'rb') end file.set_encoding(Encoding::ASCII_8BIT) file.sync = true file.binmode file.pos = pos callback.call(file) if callback end
load_existing_enqueued_chunk(path)
click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 344 def load_existing_enqueued_chunk(path) @path = path @chunk = File.open(@path, 'rb') @chunk.set_encoding(Encoding::ASCII_8BIT) @chunk.binmode @chunk.seek(0, IO::SEEK_SET) @bytesize = @chunk.size @commit_position = @chunk.size @meta_path = @path + '.meta' if File.readable?(@meta_path) restore_metadata(File.open(@meta_path){|f| f.set_encoding(Encoding::ASCII_8BIT); f.binmode; f.read }) else restore_metadata_partially(@chunk) end @state = :queued end
load_existing_staged_chunk(path)
click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 302 def load_existing_staged_chunk(path) @path = path @meta_path = @path + '.meta' @meta = nil # staging buffer chunk without metadata is classic buffer chunk file # and it should be enqueued immediately if File.exist?(@meta_path) @chunk = File.open(@path, 'rb+') @chunk.set_encoding(Encoding::ASCII_8BIT) @chunk.sync = true @chunk.seek(0, IO::SEEK_END) @chunk.binmode @meta = File.open(@meta_path, 'rb+') @meta.set_encoding(Encoding::ASCII_8BIT) @meta.sync = true @meta.binmode restore_metadata(@meta.read) @meta.seek(0, IO::SEEK_SET) @state = :staged @bytesize = @chunk.size @commit_position = @chunk.pos @adding_bytes = 0 @adding_size = 0 else # classic buffer chunk - read only chunk @chunk = File.open(@path, 'rb') @chunk.set_encoding(Encoding::ASCII_8BIT) @chunk.binmode @chunk.seek(0, IO::SEEK_SET) @state = :queued @bytesize = @chunk.size restore_metadata_partially(@chunk) @commit_position = @chunk.size @unique_id = self.class.unique_id_from_path(@path) || @unique_id end end
open(**kwargs) { |chunk| ... }
click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 168 def open(**kwargs, &block) @chunk.seek(0, IO::SEEK_SET) val = yield @chunk @chunk.seek(0, IO::SEEK_END) if self.staged? val end
purge()
click to toggle source
Calls superclass method
Fluent::Plugin::Buffer::Chunk#purge
# File lib/fluent/plugin/buffer/file_chunk.rb, line 155 def purge super @chunk.close @meta.close if @meta @bytesize = @size = @adding_bytes = @adding_size = 0 File.unlink(@path, @meta_path) end
read(**kwargs)
click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 163 def read(**kwargs) @chunk.seek(0, IO::SEEK_SET) @chunk.read end
restore_metadata(bindata)
click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 214 def restore_metadata(bindata) data = msgpack_unpacker(symbolize_keys: true).feed(bindata).read rescue {} now = Time.now @unique_id = data[:id] || self.class.unique_id_from_path(@path) || @unique_id @size = data[:s] || 0 @created_at = Time.at(data.fetch(:c, now.to_i)) @modified_at = Time.at(data.fetch(:m, now.to_i)) @metadata.timekey = data[:timekey] @metadata.tag = data[:tag] @metadata.variables = data[:variables] end
restore_metadata_partially(chunk)
click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 229 def restore_metadata_partially(chunk) @unique_id = self.class.unique_id_from_path(chunk.path) || @unique_id @size = 0 @created_at = chunk.ctime # birthtime isn't supported on Windows (and Travis?) @modified_at = chunk.mtime @metadata.timekey = nil @metadata.tag = nil @metadata.variables = nil end
rollback()
click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 80 def rollback if @chunk.pos != @commit_position @chunk.seek(@commit_position, IO::SEEK_SET) @chunk.truncate(@commit_position) end @adding_bytes = @adding_size = 0 true end
size()
click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 93 def size @size + @adding_size end
write_metadata(update: true)
click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 240 def write_metadata(update: true) data = @metadata.to_h.merge({ id: @unique_id, s: (update ? @size + @adding_size : @size), c: @created_at.to_i, m: (update ? Time.now : @modified_at).to_i, }) @meta.seek(0, IO::SEEK_SET) @meta.truncate(0) @meta.write(msgpack_packer.pack(data)) end