class Fluent::Plugin::Buffer::FileSingleChunk

Constants

ESCAPE_REGEXP
PATH_EXT

buffer path user specified : /path/to/directory buffer chunk path : /path/to/directory/fsb.key.b513b61c9791029c2513b61c9791029c2.buf state: b/q - ‘b’(on stage), ‘q’(enqueued)

PATH_REGEXP
PATH_SUFFIX

Attributes

path[R]
permission[R]

Public Class Methods

assume_chunk_state(path) click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 149
def self.assume_chunk_state(path)
  return :unknown unless path.end_with?(PATH_SUFFIX)

  if PATH_REGEXP =~ path
    $1 == 'b' ? :staged : :queued
  else
    # files which matches to glob of buffer file pattern
    # it includes files which are created by out_file
    :unknown
  end
end
generate_queued_chunk_path(path, unique_id) click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 183
def self.generate_queued_chunk_path(path, unique_id)
  chunk_id = Fluent::UniqueId.hex(unique_id)
  staged_path = ".b#{chunk_id}."
  if path.index(staged_path)
    path.sub(staged_path, ".q#{chunk_id}.")
  else # for unexpected cases (ex: users rename files while opened by fluentd)
    path + ".q#{chunk_id}.chunk"
  end
end
generate_stage_chunk_path(path, key, unique_id) click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 172
def self.generate_stage_chunk_path(path, key, unique_id)
  pos = path.index('.*.')
  raise "BUG: buffer chunk path on stage MUST have '.*.'" unless pos

  prefix = path[0...pos]
  suffix = path[(pos + 3)..-1]

  chunk_id = Fluent::UniqueId.hex(unique_id)
  "#{prefix}.#{key}.b#{chunk_id}.#{suffix}"
end
new(metadata, path, mode, key, perm: Fluent::DEFAULT_FILE_PERMISSION, compress: :text) click to toggle source
Calls superclass method Fluent::Plugin::Buffer::Chunk::new
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 38
def initialize(metadata, path, mode, key, perm: Fluent::DEFAULT_FILE_PERMISSION, compress: :text)
  super(metadata, compress: compress)
  @key = key
  perm ||= Fluent::DEFAULT_FILE_PERMISSION
  @permission = perm.is_a?(String) ? perm.to_i(8) : perm
  @bytesize = @size = @adding_bytes = @adding_size = 0

  case mode
  when :create then create_new_chunk(path, metadata, @permission)
  when :staged then load_existing_staged_chunk(path)
  when :queued then load_existing_enqueued_chunk(path)
  else
    raise ArgumentError, "Invalid file chunk mode: #{mode}"
  end
end
unique_id_and_key_from_path(path) click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 161
def self.unique_id_and_key_from_path(path)
  base = File.basename(path)
  res = PATH_REGEXP =~ base
  return nil unless res

  key = base[4..res - 1] # remove 'fsb.' and '.'
  hex_id = $2            # remove '.' and '.buf'
  unique_id = hex_id.scan(/../).map {|x| x.to_i(16) }.pack('C*')
  [unique_id, key]
end

Public Instance Methods

bytesize() click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 83
def bytesize
  @bytesize + @adding_bytes
end
close() click to toggle source
Calls superclass method Fluent::Plugin::Buffer::Chunk#close
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 121
def close
  super
  size = @chunk.size
  @chunk.close
  if size == 0
    File.unlink(@path)
  end
end
commit() click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 64
def commit
  @commit_position = @chunk.pos
  @size += @adding_size
  @bytesize += @adding_bytes
  @adding_bytes = @adding_size = 0
  @modified_at = Fluent::Clock.real_now

  true
end
concat(bulk, bulk_size) click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 54
def concat(bulk, bulk_size)
  raise "BUG: concatenating to unwritable chunk, now '#{self.state}'" unless self.writable?

  bulk.force_encoding(Encoding::ASCII_8BIT)
  @chunk.write(bulk)
  @adding_bytes += bulk.bytesize
  @adding_size += bulk_size
  true
end
create_new_chunk(path, metadata, perm) click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 253
def create_new_chunk(path, metadata, perm)
  @path = self.class.generate_stage_chunk_path(path, encode_key(metadata), @unique_id)
  begin
    @chunk = File.open(@path, 'wb+', perm)
    @chunk.set_encoding(Encoding::ASCII_8BIT)
    @chunk.sync = true
    @chunk.binmode
  rescue => e
    # Here assumes "Too many open files" like recoverable error so raising BufferOverflowError.
    # If other cases are possible, we will change erorr handling with proper classes.
    raise BufferOverflowError, "can't create buffer file for #{path}. Stop creating buffer files: error = #{e}"
  end

  @state = :unstaged
  @bytesize = 0
  @commit_position = @chunk.pos # must be 0
  @adding_bytes = 0
  @adding_size = 0
end
decode_key(key) click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 249
def decode_key(key)
  URI::DEFAULT_PARSER.unescape(key)
end
empty?() click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 91
def empty?
  @bytesize.zero?
end
encode_key(metadata) click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 243
def encode_key(metadata)
  k = @key ? metadata.variables[@key] : metadata.tag
  k ||= ''
  URI::DEFAULT_PARSER.escape(k, ESCAPE_REGEXP)
end
enqueued!() click to toggle source
Calls superclass method Fluent::Plugin::Buffer::Chunk#enqueued!
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 95
def enqueued!
  return unless self.staged?

  new_chunk_path = self.class.generate_queued_chunk_path(@path, @unique_id)

  begin
    file_rename(@chunk, @path, new_chunk_path, ->(new_io) { @chunk = new_io })
  rescue => e
    begin
      file_rename(@chunk, new_chunk_path, @path, ->(new_io) { @chunk = new_io }) if File.exist?(new_chunk_path)
    rescue => re
      # In this point, restore buffer state is hard because previous `file_rename` failed by resource problem.
      # Retry is one possible approach but it may cause livelock under limited resources or high load environment.
      # So we ignore such errors for now and log better message instead.
      # "Too many open files" should be fixed by proper buffer configuration and system setting.
      raise "can't enqueue buffer file and failed to restore. This may causes inconsistent state: path = #{@path}, error = '#{e}', retry error = '#{re}'"
    else
      raise "can't enqueue buffer file: path = #{@path}, error = '#{e}'"
    end
  end

  @path = new_chunk_path

  super
end
file_rename(file, old_path, new_path, callback = nil) click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 224
def file_rename(file, old_path, new_path, callback = nil)
  pos = file.pos
  if Fluent.windows?
    file.close
    File.rename(old_path, new_path)
    file = File.open(new_path, 'rb', @permission)
  else
    File.rename(old_path, new_path)
    file.reopen(new_path, 'rb')
  end
  file.set_encoding(Encoding::ASCII_8BIT)
  file.sync = true
  file.binmode
  file.pos = pos
  callback.call(file) if callback
end
load_existing_enqueued_chunk(path) click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 292
def load_existing_enqueued_chunk(path)
  @path = path
  raise FileChunkError, "enqueued file chunk is empty" if File.size(@path).zero?

  @chunk = File.open(@path, 'rb')
  @chunk.set_encoding(Encoding::ASCII_8BIT)
  @chunk.binmode
  @chunk.seek(0, IO::SEEK_SET)

  restore_metadata

  @state = :queued
  @bytesize = @chunk.size
  @commit_position = @chunk.size
end
load_existing_staged_chunk(path) click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 273
def load_existing_staged_chunk(path)
  @path = path
  raise FileChunkError, "staged file chunk is empty" if File.size(@path).zero?

  @chunk = File.open(@path, 'rb+')
  @chunk.set_encoding(Encoding::ASCII_8BIT)
  @chunk.sync = true
  @chunk.binmode
  @chunk.seek(0, IO::SEEK_END)

  restore_metadata

  @state = :staged
  @bytesize = @chunk.size
  @commit_position = @chunk.pos
  @adding_bytes = 0
  @adding_size = 0
end
open(**kwargs) { |chunk| ... } click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 142
def open(**kwargs, &block)
  @chunk.seek(0, IO::SEEK_SET)
  val = yield @chunk
  @chunk.seek(0, IO::SEEK_END) if self.staged?
  val
end
purge() click to toggle source
Calls superclass method Fluent::Plugin::Buffer::Chunk#purge
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 130
def purge
  super
  @chunk.close
  @bytesize = @size = @adding_bytes = @adding_size = 0
  File.unlink(@path)
end
read(**kwargs) click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 137
def read(**kwargs)
  @chunk.seek(0, IO::SEEK_SET)
  @chunk.read
end
restore_metadata() click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 193
def restore_metadata
  if res = self.class.unique_id_and_key_from_path(@path)
    @unique_id = res.first
    key = decode_key(res.last)
    if @key
      @metadata.variables = {@key => key}
    else
      @metadata.tag = key
    end
  else
    raise FileChunkError, "Invalid chunk found. unique_id and key not exist: #{@path}"
  end
  @size = 0

  stat = File.stat(@path)
  @created_at = stat.ctime.to_i
  @modified_at = stat.mtime.to_i
end
restore_size(chunk_format) click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 212
def restore_size(chunk_format)
  count = 0
  File.open(@path, 'rb') { |f|
    if chunk_format == :msgpack
      Fluent::MessagePackFactory.msgpack_unpacker(f).each { |d| count += 1 }
    else
      f.each_line { |l| count += 1 }
    end
  }
  @size = count
end
rollback() click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 74
def rollback
  if @chunk.pos != @commit_position
    @chunk.seek(@commit_position, IO::SEEK_SET)
    @chunk.truncate(@commit_position)
  end
  @adding_bytes = @adding_size = 0
  true
end
size() click to toggle source
# File lib/fluent/plugin/buffer/file_single_chunk.rb, line 87
def size
  @size + @adding_size
end