class Fluent::Plugin::BinInput::TailWatcher

Public Instance Methods

on_rotate(io) click to toggle source
# File lib/fluent/plugin/in_bin.rb, line 45
def on_rotate(io)
  if @io_handler == nil
    if io
      # first time
      stat = io.stat
      fsize = stat.size
      inode = stat.ino

      last_inode = @pe.read_inode
      if inode == last_inode
        # rotated file has the same inode number with the last file.
        # assuming following situation:
        #   a) file was once renamed and backed, or
        #   b) symlink or hardlink to the same file is recreated
        # in either case, seek to the saved position
        pos = @pe.read_pos
      elsif last_inode != 0
        # this is FilePositionEntry and fluentd once started.
        # read data from the head of the rotated file.
        # logs never duplicate because this file is a rotated new file.
        pos = 0
        @pe.update(inode, pos)
      else
        # this is MemoryPositionEntry or this is the first time fluentd started.
        # seek to the end of the any files.
        # logs may duplicate without this seek because it's not sure the file is
        # existent file or rotated new file.
        pos = @read_from_head ? 0 : fsize
        @pe.update(inode, pos)
      end
      io.seek(pos)

      @io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
    else
      @io_handler = NullIOHandler.new
    end
  else
    log_msg = "detected rotation of #{@path}"
    log_msg << "; waiting #{@rotate_wait} seconds" if @io_handler.io  # wait rotate_time if previous file is exist
    @log.info log_msg

    if io
      stat = io.stat
      inode = stat.ino
      if inode == @pe.read_inode # truncated
        @pe.update_pos(stat.size)
        io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
        @io_handler.close
        @io_handler = io_handler
      elsif @io_handler.io.nil? # There is no previous file. Reuse TailWatcher
        @pe.update(inode, io.pos)
        io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines))
        @io_handler = io_handler
      else # file is rotated and new file found
        @update_watcher.call(@path, swap_state(@pe))
      end
    else # file is rotated and new file not found
      # Clear RotateHandler to avoid duplicated file watch in same path.
      @rotate_handler = nil
      @update_watcher.call(@path, swap_state(@pe))
    end
  end

  def swap_state(pe)
    # Use MemoryPositionEntry for rotated file temporary
    mpe = TailInput::MemoryPositionEntry.new
    mpe.update(pe.read_inode, pe.read_pos)
    @pe = mpe
    @io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer.

    pe # This pe will be updated in on_rotate after TailWatcher is initialized
  end
end
swap_state(pe) click to toggle source
# File lib/fluent/plugin/in_bin.rb, line 108
def swap_state(pe)
  # Use MemoryPositionEntry for rotated file temporary
  mpe = TailInput::MemoryPositionEntry.new
  mpe.update(pe.read_inode, pe.read_pos)
  @pe = mpe
  @io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer.

  pe # This pe will be updated in on_rotate after TailWatcher is initialized
end