# File lib/fluent/plugin/in_tail.rb, line 27 def initialize super @paths = [] @tails = {} @ignore_list = [] end
Fluent::NewTailInput::TailWatcher#close is called by another thread at shutdown phase. It causes 'can't modify string; temporarily locked' error in IOHandler so adding close_io argument to avoid this problem. At shutdown, IOHandler's io will be released automatically after detached the event loop
# File lib/fluent/plugin/in_tail.rb, line 254 def close_watcher(tw, close_io = true) tw.close(close_io) flush_buffer(tw) if tw.unwatched && @pf @pf[tw.path].update_pos(PositionFile::UNWATCHED_POSITION) end end
# File lib/fluent/plugin/in_tail.rb, line 262 def close_watcher_after_rotate_wait(tw) closer = TailWatcher::Closer.new(@rotate_wait, tw, log, &method(:close_watcher)) closer.attach(@loop) end
# File lib/fluent/plugin/in_tail.rb, line 71 def configure(conf) super @paths = @path.split(',').map {|path| path.strip } if @paths.empty? raise ConfigError, "tail: 'path' parameter is required on tail input" end unless @pos_file $log.warn "'pos_file PATH' parameter is not set to a 'tail' source." $log.warn "this parameter is highly recommended to save the position to resume tailing." end configure_parser(conf) configure_tag configure_encoding @multiline_mode = conf['format'] =~ /multiline/ @receive_handler = if @multiline_mode method(:parse_multilines) else method(:parse_singleline) end end
# File lib/fluent/plugin/in_tail.rb, line 111 def configure_encoding unless @encoding if @from_encoding raise ConfigError, "tail: 'from_encoding' parameter must be specified with 'encoding' parameter." end end @encoding = parse_encoding_param(@encoding) if @encoding @from_encoding = parse_encoding_param(@from_encoding) if @from_encoding end
# File lib/fluent/plugin/in_tail.rb, line 96 def configure_parser(conf) @parser = Plugin.new_parser(conf['format']) @parser.configure(conf) end
# File lib/fluent/plugin/in_tail.rb, line 101 def configure_tag if @tag.index('*') @tag_prefix, @tag_suffix = @tag.split('*') @tag_suffix ||= '' else @tag_prefix = nil @tag_suffix = nil end end
# File lib/fluent/plugin/in_tail.rb, line 322 def convert_line_to_event(line, es, tail_watcher) begin line.chomp! # remove \n if @encoding if @from_encoding line.encode!(@encoding, @from_encoding) else line.force_encoding(@encoding) end end @parser.parse(line) { |time, record| if time && record record[@path_key] ||= tail_watcher.path unless @path_key.nil? es.add(time, record) else if @emit_unmatched_lines record = {'unmatched_line' => line} record[@path_key] ||= tail_watcher.path unless @path_key.nil? es.add(::Fluent::Engine.now, record) end log.warn "pattern not match: #{line.inspect}" end } rescue => e log.warn line.dump, error: e.to_s log.debug_backtrace(e.backtrace) end end
# File lib/fluent/plugin/in_tail.rb, line 154 def expand_paths date = Time.now paths = [] @paths.each { |path| path = date.strftime(path) if path.include?('*') paths += Dir.glob(path).select { |p| is_file = !File.directory?(p) if File.readable?(p) && is_file if @limit_recently_modified && File.mtime(p) < (date - @limit_recently_modified) false else true end else if is_file unless @ignore_list.include?(path) log.warn "#{p} unreadable. It is excluded and would be examined next time." @ignore_list << path if @ignore_repeated_permission_error end end false end } else # When file is not created yet, Dir.glob returns an empty array. So just add when path is static. paths << path end } excluded = @exclude_path.map { |path| path = date.strftime(path); path.include?('*') ? Dir.glob(path) : path }.flatten.uniq paths - excluded end
# File lib/fluent/plugin/in_tail.rb, line 267 def flush_buffer(tw) if lb = tw.line_buffer lb.chomp! if @encoding if @from_encoding lb.encode!(@encoding, @from_encoding) else lb.force_encoding(@encoding) end end @parser.parse(lb) { |time, record| if time && record tag = if @tag_prefix || @tag_suffix @tag_prefix + tw.tag + @tag_suffix else @tag end record[@path_key] ||= tw.path unless @path_key.nil? router.emit(tag, time, record) else log.warn "got incomplete line at shutdown from #{tw.path}: #{lb.inspect}" end } end end
# File lib/fluent/plugin/in_tail.rb, line 122 def parse_encoding_param(encoding_name) begin Encoding.find(encoding_name) if encoding_name rescue ArgumentError => e raise ConfigError, e.message end end
# File lib/fluent/plugin/in_tail.rb, line 359 def parse_multilines(lines, tail_watcher) lb = tail_watcher.line_buffer es = MultiEventStream.new if @parser.has_firstline? tail_watcher.line_buffer_timer_flusher.reset_timer if tail_watcher.line_buffer_timer_flusher lines.each { |line| if @parser.firstline?(line) if lb convert_line_to_event(lb, es, tail_watcher) end lb = line else if lb.nil? if @emit_unmatched_lines convert_line_to_event(line, es, tail_watcher) end log.warn "got incomplete line before first line from #{tail_watcher.path}: #{line.inspect}" else lb << line end end } else lb ||= '' lines.each do |line| lb << line @parser.parse(lb) { |time, record| if time && record convert_line_to_event(lb, es, tail_watcher) lb = '' end } end end tail_watcher.line_buffer = lb es end
# File lib/fluent/plugin/in_tail.rb, line 351 def parse_singleline(lines, tail_watcher) es = MultiEventStream.new lines.each { |line| convert_line_to_event(line, es, tail_watcher) } es end
@return true if no error or unrecoverable error happens in emit action. false if got BufferQueueLimitError
# File lib/fluent/plugin/in_tail.rb, line 301 def receive_lines(lines, tail_watcher) es = @receive_handler.call(lines, tail_watcher) unless es.empty? tag = if @tag_prefix || @tag_suffix @tag_prefix + tail_watcher.tag + @tag_suffix else @tag end begin router.emit_stream(tag, es) rescue BufferQueueLimitError return false rescue # ignore non BufferQueueLimitError errors because in_tail can't recover. Engine shows logs and backtraces. return true end end return true end
in_tail with '*' path doesn't check rotation file equality at refresh phase. So you should not use '*' path when your logs will be rotated by another tool. It will cause log duplication after updated watch files. In such case, you should separate log directory and specify two paths in path parameter. e.g. path /path/to/dir/*,/path/to/rotated_logs/target_file
# File lib/fluent/plugin/in_tail.rb, line 193 def refresh_watchers target_paths = expand_paths existence_paths = @tails.keys unwatched = existence_paths - target_paths added = target_paths - existence_paths stop_watchers(unwatched, false, true) unless unwatched.empty? start_watchers(added) unless added.empty? end
# File lib/fluent/plugin/in_tail.rb, line 293 def run @loop.run rescue log.error "unexpected error", error: $!.to_s log.error_backtrace end
# File lib/fluent/plugin/in_tail.rb, line 204 def setup_watcher(path, pe) line_buffer_timer_flusher = (@multiline_mode && @multiline_flush_interval) ? TailWatcher::LineBufferTimerFlusher.new(log, @multiline_flush_interval, &method(:flush_buffer)) : nil tw = TailWatcher.new(path, @rotate_wait, pe, log, @read_from_head, @enable_watch_timer, @read_lines_limit, method(:update_watcher), line_buffer_timer_flusher, &method(:receive_lines)) tw.attach(@loop) tw end
# File lib/fluent/plugin/in_tail.rb, line 145 def shutdown @refresh_trigger.detach if @refresh_trigger && @refresh_trigger.attached? stop_watchers(@tails.keys, true) @loop.stop rescue nil # when all watchers are detached, `stop` raises RuntimeError. We can ignore this exception. @thread.join @pf_file.close if @pf_file end
# File lib/fluent/plugin/in_tail.rb, line 130 def start if @pos_file @pf_file = File.open(@pos_file, File::RDWR|File::CREAT, DEFAULT_FILE_PERMISSION) @pf_file.sync = true @pf = PositionFile.parse(@pf_file) end @loop = Coolio::Loop.new refresh_watchers unless @skip_refresh_on_startup @refresh_trigger = TailWatcher::TimerWatcher.new(@refresh_interval, true, log, &method(:refresh_watchers)) @refresh_trigger.attach(@loop) @thread = Thread.new(&method(:run)) end
# File lib/fluent/plugin/in_tail.rb, line 211 def start_watchers(paths) paths.each { |path| pe = nil if @pf pe = @pf[path] if @read_from_head && pe.read_inode.zero? begin pe.update(File::Stat.new(path).ino, 0) rescue Errno::ENOENT $log.warn "#{path} not found. Continuing without tailing it." end end end @tails[path] = setup_watcher(path, pe) } end
# File lib/fluent/plugin/in_tail.rb, line 229 def stop_watchers(paths, immediate = false, unwatched = false) paths.each { |path| tw = @tails.delete(path) if tw tw.unwatched = unwatched if immediate close_watcher(tw, false) else close_watcher_after_rotate_wait(tw) end end } end
#refresh_watchers calls @tails.keys so we don't use stop_watcher -> start_watcher sequence for safety.
# File lib/fluent/plugin/in_tail.rb, line 244 def update_watcher(path, pe) rotated_tw = @tails[path] @tails[path] = setup_watcher(path, pe) close_watcher_after_rotate_wait(rotated_tw) if rotated_tw end