class Fluent::Plugin::BinInput::TailWatcher
Public Instance Methods
on_rotate(io)
click to toggle source
# File lib/fluent/plugin/in_bin.rb, line 45 def on_rotate(io) if @io_handler == nil if io # first time stat = io.stat fsize = stat.size inode = stat.ino last_inode = @pe.read_inode if inode == last_inode # rotated file has the same inode number with the last file. # assuming following situation: # a) file was once renamed and backed, or # b) symlink or hardlink to the same file is recreated # in either case, seek to the saved position pos = @pe.read_pos elsif last_inode != 0 # this is FilePositionEntry and fluentd once started. # read data from the head of the rotated file. # logs never duplicate because this file is a rotated new file. pos = 0 @pe.update(inode, pos) else # this is MemoryPositionEntry or this is the first time fluentd started. # seek to the end of the any files. # logs may duplicate without this seek because it's not sure the file is # existent file or rotated new file. pos = @read_from_head ? 0 : fsize @pe.update(inode, pos) end io.seek(pos) @io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines)) else @io_handler = NullIOHandler.new end else log_msg = "detected rotation of #{@path}" log_msg << "; waiting #{@rotate_wait} seconds" if @io_handler.io # wait rotate_time if previous file is exist @log.info log_msg if io stat = io.stat inode = stat.ino if inode == @pe.read_inode # truncated @pe.update_pos(stat.size) io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines)) @io_handler.close @io_handler = io_handler elsif @io_handler.io.nil? # There is no previous file. Reuse TailWatcher @pe.update(inode, io.pos) io_handler = IOHandler.new(io, @pe, @log, @read_lines_limit, &method(:wrap_receive_lines)) @io_handler = io_handler else # file is rotated and new file found @update_watcher.call(@path, swap_state(@pe)) end else # file is rotated and new file not found # Clear RotateHandler to avoid duplicated file watch in same path. @rotate_handler = nil @update_watcher.call(@path, swap_state(@pe)) end end def swap_state(pe) # Use MemoryPositionEntry for rotated file temporary mpe = TailInput::MemoryPositionEntry.new mpe.update(pe.read_inode, pe.read_pos) @pe = mpe @io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer. pe # This pe will be updated in on_rotate after TailWatcher is initialized end end
swap_state(pe)
click to toggle source
# File lib/fluent/plugin/in_bin.rb, line 108 def swap_state(pe) # Use MemoryPositionEntry for rotated file temporary mpe = TailInput::MemoryPositionEntry.new mpe.update(pe.read_inode, pe.read_pos) @pe = mpe @io_handler.pe = mpe # Don't re-create IOHandler because IOHandler has an internal buffer. pe # This pe will be updated in on_rotate after TailWatcher is initialized end