class Fluent::Plugin::Buffer::FileChunk

Constants

FILE_PERMISSION

Attributes

path[R]
permission[R]

Public Class Methods

assume_chunk_state(path) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 175
def self.assume_chunk_state(path)
  if /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n =~ path # //n switch means explicit 'ASCII-8BIT' pattern
    $1 == 'b' ? :staged : :queued
  else
    # files which matches to glob of buffer file pattern
    # it includes files which are created by out_file
    :unknown
  end
end
generate_queued_chunk_path(path, unique_id) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 197
def self.generate_queued_chunk_path(path, unique_id)
  chunk_id = Fluent::UniqueId.hex(unique_id)
  if path.index(".b#{chunk_id}.")
    path.sub(".b#{chunk_id}.", ".q#{chunk_id}.")
  else # for unexpected cases (ex: users rename files while opened by fluentd)
    path + ".q#{chunk_id}.chunk"
  end
end
generate_stage_chunk_path(path, unique_id) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 185
def self.generate_stage_chunk_path(path, unique_id)
  pos = path.index('.*.')
  raise "BUG: buffer chunk path on stage MUST have '.*.'" unless pos

  prefix = path[0...pos]
  suffix = path[(pos+3)..-1]

  chunk_id = Fluent::UniqueId.hex(unique_id)
  state = 'b'
  "#{prefix}.#{state}#{chunk_id}.#{suffix}"
end
new(metadata, path, mode, perm: system_config.file_permission || FILE_PERMISSION, compress: :text) click to toggle source
Calls superclass method Fluent::Plugin::Buffer::Chunk::new
# File lib/fluent/plugin/buffer/file_chunk.rb, line 43
def initialize(metadata, path, mode, perm: system_config.file_permission || FILE_PERMISSION, compress: :text)
  super(metadata, compress: compress)
  @permission = perm.is_a?(String) ? perm.to_i(8) : perm
  @bytesize = @size = @adding_bytes = @adding_size = 0
  @meta = nil

  case mode
  when :create then create_new_chunk(path, @permission)
  when :staged then load_existing_staged_chunk(path)
  when :queued then load_existing_enqueued_chunk(path)
  else
    raise ArgumentError, "Invalid file chunk mode: #{mode}"
  end
end
unique_id_from_path(path) click to toggle source

used only for queued v0.12 buffer path

# File lib/fluent/plugin/buffer/file_chunk.rb, line 207
def self.unique_id_from_path(path)
  if /\.(b|q)([0-9a-f]+)\.[^\/]*\Z/n =~ path # //n switch means explicit 'ASCII-8BIT' pattern
    return $2.scan(/../).map{|x| x.to_i(16) }.pack('C*')
  end
  nil
end

Public Instance Methods

bytesize() click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 89
def bytesize
  @bytesize + @adding_bytes
end
close() click to toggle source
Calls superclass method Fluent::Plugin::Buffer::Chunk#close
# File lib/fluent/plugin/buffer/file_chunk.rb, line 145
def close
  super
  size = @chunk.size
  @chunk.close
  @meta.close if @meta # meta may be missing if chunk is queued at first
  if size == 0
    File.unlink(@path, @meta_path)
  end
end
commit() click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 68
def commit
  write_metadata # this should be at first: of course, this operation may fail

  @commit_position = @chunk.pos
  @size += @adding_size
  @bytesize += @adding_bytes
  @adding_bytes = @adding_size = 0
  @modified_at = Time.now

  true
end
concat(bulk, bulk_size) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 58
def concat(bulk, bulk_size)
  raise "BUG: concatenating to unwritable chunk, now '#{self.state}'" unless self.writable?

  bulk.force_encoding(Encoding::ASCII_8BIT)
  @chunk.write bulk
  @adding_bytes += bulk.bytesize
  @adding_size += bulk_size
  true
end
create_new_chunk(path, perm) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 269
def create_new_chunk(path, perm)
  @path = self.class.generate_stage_chunk_path(path, @unique_id)
  @meta_path = @path + '.meta'
  begin
    @chunk = File.open(@path, 'wb+', perm)
    @chunk.set_encoding(Encoding::ASCII_8BIT)
    @chunk.sync = true
    @chunk.binmode
  rescue => e
    # Here assumes "Too many open files" like recoverable error so raising BufferOverflowError.
    # If other cases are possible, we will change erorr handling with proper classes.
    raise BufferOverflowError, "can't create buffer file for #{path}. Stop creating buffer files: error = #{e}"
  end
  begin
    @meta = File.open(@meta_path, 'wb', perm)
    @meta.set_encoding(Encoding::ASCII_8BIT)
    @meta.sync = true
    @meta.binmode
  rescue => e
    # This case is easier than enqueued!. Just removing pre-create buffer file
    @chunk.close rescue nil
    File.unlink(@path) rescue nil
    # Same as @chunk case. See above
    raise BufferOverflowError, "can't create buffer metadata for #{path}. Stop creating buffer files: error = #{e}"
  end

  @state = :unstaged
  @bytesize = 0
  @commit_position = @chunk.pos # must be 0
  @adding_bytes = 0
  @adding_size = 0
end
empty?() click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 97
def empty?
  @bytesize == 0
end
enqueued!() click to toggle source
Calls superclass method Fluent::Plugin::Buffer::Chunk#enqueued!
# File lib/fluent/plugin/buffer/file_chunk.rb, line 101
def enqueued!
  return unless self.staged?

  new_chunk_path = self.class.generate_queued_chunk_path(@path, @unique_id)
  new_meta_path = new_chunk_path + '.meta'

  write_metadata(update: false) # re-write metadata w/ finalized records

  begin
    file_rename(@chunk, @path, new_chunk_path, ->(new_io) { @chunk = new_io })
  rescue => e
    begin
      file_rename(@chunk, new_chunk_path, @path, ->(new_io) { @chunk = new_io }) if File.exist?(new_chunk_path)
    rescue => re
      # In this point, restore buffer state is hard because previous `file_rename` failed by resource problem.
      # Retry is one possible approach but it may cause livelock under limited resources or high load environment.
      # So we ignore such errors for now and log better message instead.
      # "Too many open files" should be fixed by proper buffer configuration and system setting.
      raise "can't enqueue buffer file and failed to restore. This may causes inconsistent state: path = #{@path}, error = '#{e}', retry error = '#{re}'"
    else
      raise "can't enqueue buffer file: path = #{@path}, error = '#{e}'"
    end
  end

  begin
    file_rename(@meta, @meta_path, new_meta_path, ->(new_io) { @meta = new_io })
  rescue => e
    begin
      file_rename(@chunk, new_chunk_path, @path, ->(new_io) { @chunk = new_io }) if File.exist?(new_chunk_path)
      file_rename(@meta, new_meta_path, @meta_path, ->(new_io) { @meta = new_io }) if File.exist?(new_meta_path)
    rescue => re
      # See above
      raise "can't enqueue buffer metadata and failed to restore. This may causes inconsistent state: path = #{@meta_path}, error = '#{e}', retry error = '#{re}'"
    else
      raise "can't enqueue buffer metadata: path = #{@meta_path}, error = '#{e}'"
    end
  end

  @path = new_chunk_path
  @meta_path = new_meta_path

  super
end
file_rename(file, old_path, new_path, callback=nil) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 252
def file_rename(file, old_path, new_path, callback=nil)
  pos = file.pos
  if Fluent.windows?
    file.close
    File.rename(old_path, new_path)
    file = File.open(new_path, 'rb', @permission)
  else
    File.rename(old_path, new_path)
    file.reopen(new_path, 'rb')
  end
  file.set_encoding(Encoding::ASCII_8BIT)
  file.sync = true
  file.binmode
  file.pos = pos
  callback.call(file) if callback
end
load_existing_enqueued_chunk(path) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 344
def load_existing_enqueued_chunk(path)
  @path = path
  @chunk = File.open(@path, 'rb')
  @chunk.set_encoding(Encoding::ASCII_8BIT)
  @chunk.binmode
  @chunk.seek(0, IO::SEEK_SET)
  @bytesize = @chunk.size
  @commit_position = @chunk.size

  @meta_path = @path + '.meta'
  if File.readable?(@meta_path)
    restore_metadata(File.open(@meta_path){|f| f.set_encoding(Encoding::ASCII_8BIT); f.binmode; f.read })
  else
    restore_metadata_partially(@chunk)
  end
  @state = :queued
end
load_existing_staged_chunk(path) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 302
def load_existing_staged_chunk(path)
  @path = path
  @meta_path = @path + '.meta'

  @meta = nil
  # staging buffer chunk without metadata is classic buffer chunk file
  # and it should be enqueued immediately
  if File.exist?(@meta_path)
    @chunk = File.open(@path, 'rb+')
    @chunk.set_encoding(Encoding::ASCII_8BIT)
    @chunk.sync = true
    @chunk.seek(0, IO::SEEK_END)
    @chunk.binmode

    @meta = File.open(@meta_path, 'rb+')
    @meta.set_encoding(Encoding::ASCII_8BIT)
    @meta.sync = true
    @meta.binmode
    restore_metadata(@meta.read)
    @meta.seek(0, IO::SEEK_SET)

    @state = :staged
    @bytesize = @chunk.size
    @commit_position = @chunk.pos
    @adding_bytes = 0
    @adding_size = 0
  else
    # classic buffer chunk - read only chunk
    @chunk = File.open(@path, 'rb')
    @chunk.set_encoding(Encoding::ASCII_8BIT)
    @chunk.binmode
    @chunk.seek(0, IO::SEEK_SET)
    @state = :queued
    @bytesize = @chunk.size

    restore_metadata_partially(@chunk)

    @commit_position = @chunk.size
    @unique_id = self.class.unique_id_from_path(@path) || @unique_id
  end
end
open(**kwargs) { |chunk| ... } click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 168
def open(**kwargs, &block)
  @chunk.seek(0, IO::SEEK_SET)
  val = yield @chunk
  @chunk.seek(0, IO::SEEK_END) if self.staged?
  val
end
purge() click to toggle source
Calls superclass method Fluent::Plugin::Buffer::Chunk#purge
# File lib/fluent/plugin/buffer/file_chunk.rb, line 155
def purge
  super
  @chunk.close
  @meta.close if @meta
  @bytesize = @size = @adding_bytes = @adding_size = 0
  File.unlink(@path, @meta_path)
end
read(**kwargs) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 163
def read(**kwargs)
  @chunk.seek(0, IO::SEEK_SET)
  @chunk.read
end
restore_metadata(bindata) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 214
def restore_metadata(bindata)
  data = msgpack_unpacker(symbolize_keys: true).feed(bindata).read rescue {}

  now = Time.now

  @unique_id = data[:id] || self.class.unique_id_from_path(@path) || @unique_id
  @size = data[:s] || 0
  @created_at = Time.at(data.fetch(:c, now.to_i))
  @modified_at = Time.at(data.fetch(:m, now.to_i))

  @metadata.timekey = data[:timekey]
  @metadata.tag = data[:tag]
  @metadata.variables = data[:variables]
end
restore_metadata_partially(chunk) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 229
def restore_metadata_partially(chunk)
  @unique_id = self.class.unique_id_from_path(chunk.path) || @unique_id
  @size = 0
  @created_at = chunk.ctime # birthtime isn't supported on Windows (and Travis?)
  @modified_at = chunk.mtime

  @metadata.timekey = nil
  @metadata.tag = nil
  @metadata.variables = nil
end
rollback() click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 80
def rollback
  if @chunk.pos != @commit_position
    @chunk.seek(@commit_position, IO::SEEK_SET)
    @chunk.truncate(@commit_position)
  end
  @adding_bytes = @adding_size = 0
  true
end
size() click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 93
def size
  @size + @adding_size
end
write_metadata(update: true) click to toggle source
# File lib/fluent/plugin/buffer/file_chunk.rb, line 240
def write_metadata(update: true)
  data = @metadata.to_h.merge({
      id: @unique_id,
      s: (update ? @size + @adding_size : @size),
      c: @created_at.to_i,
      m: (update ? Time.now : @modified_at).to_i,
  })
  @meta.seek(0, IO::SEEK_SET)
  @meta.truncate(0)
  @meta.write(msgpack_packer.pack(data))
end