class Fluent::Plugin::TailInput::TailWatcher
Attributes
enable_watch_timer[R]
encoding[R]
from_encoding[R]
line_buffer[RW]
line_buffer_timer_flusher[RW]
log[R]
open_on_every_update[R]
path[R]
pe[R]
read_lines_limit[R]
stat_trigger[R]
timer_trigger[RW]
unwatched[RW]
Public Class Methods
new(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines)
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 420 def initialize(path, rotate_wait, pe, log, read_from_head, enable_watch_timer, read_lines_limit, update_watcher, line_buffer_timer_flusher, from_encoding, encoding, open_on_every_update, &receive_lines) @path = path @rotate_wait = rotate_wait @pe = pe || MemoryPositionEntry.new @read_from_head = read_from_head @enable_watch_timer = enable_watch_timer @read_lines_limit = read_lines_limit @receive_lines = receive_lines @update_watcher = update_watcher @stat_trigger = StatWatcher.new(self, &method(:on_notify)) @timer_trigger = nil @rotate_handler = RotateHandler.new(self, &method(:on_rotate)) @io_handler = nil @log = log @line_buffer_timer_flusher = line_buffer_timer_flusher @from_encoding = from_encoding @encoding = encoding @open_on_every_update = open_on_every_update end
Public Instance Methods
attach() { |self| ... }
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 459 def attach on_notify yield self end
close()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 470 def close if @io_handler @io_handler.close @io_handler = nil end end
detach()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 464 def detach @timer_trigger.detach if @enable_watch_timer && @timer_trigger.attached? @stat_trigger.detach if @stat_trigger.attached? @io_handler.on_notify if @io_handler end
on_notify()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 477 def on_notify begin stat = Fluent::FileWrapper.stat(@path) rescue Errno::ENOENT # moved or deleted stat = nil end @rotate_handler.on_notify(stat) if @rotate_handler @line_buffer_timer_flusher.on_notify(self) if @line_buffer_timer_flusher @io_handler.on_notify if @io_handler end
on_rotate(stat)
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 490 def on_rotate(stat) if @io_handler.nil? if stat # first time fsize = stat.size inode = stat.ino last_inode = @pe.read_inode if inode == last_inode # rotated file has the same inode number with the last file. # assuming following situation: # a) file was once renamed and backed, or # b) symlink or hardlink to the same file is recreated # in either case, seek to the saved position elsif last_inode != 0 # this is FilePositionEntry and fluentd once started. # read data from the head of the rotated file. # logs never duplicate because this file is a rotated new file. @pe.update(inode, 0) else # this is MemoryPositionEntry or this is the first time fluentd started. # seek to the end of the any files. # logs may duplicate without this seek because it's not sure the file is # existent file or rotated new file. pos = @read_from_head ? 0 : fsize @pe.update(inode, pos) end @io_handler = IOHandler.new(self, &method(:wrap_receive_lines)) else @io_handler = NullIOHandler.new end else watcher_needs_update = false if stat inode = stat.ino if inode == @pe.read_inode # truncated @pe.update_pos(0) @io_handler.close elsif !@io_handler.opened? # There is no previous file. Reuse TailWatcher @pe.update(inode, 0) else # file is rotated and new file found watcher_needs_update = true end else # file is rotated and new file not found # Clear RotateHandler to avoid duplicated file watch in same path. @rotate_handler = nil watcher_needs_update = true end log_msg = "detected rotation of #{@path}" log_msg << "; waiting #{@rotate_wait} seconds" if watcher_needs_update # wait rotate_time if previous file exists @log.info log_msg if watcher_needs_update @update_watcher.call(@path, swap_state(@pe)) else @io_handler = IOHandler.new(self, &method(:wrap_receive_lines)) end end end
swap_state(pe)
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 552 def swap_state(pe) # Use MemoryPositionEntry for rotated file temporary mpe = MemoryPositionEntry.new mpe.update(pe.read_inode, pe.read_pos) @pe = mpe pe # This pe will be updated in on_rotate after TailWatcher is initialized end
tag()
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 451 def tag @parsed_tag ||= @path.tr('/', '.').gsub(/\.+/, '.').gsub(/^\./, '') end
wrap_receive_lines(lines)
click to toggle source
# File lib/fluent/plugin/in_tail.rb, line 455 def wrap_receive_lines(lines) @receive_lines.call(lines, self) end