class Fluent::DetectExceptionsOutput
This output plugin consumes a log stream of JSON objects which contain single-line log messages. If a consecutive sequence of log messages form an exception stack trace, they forwarded as a single, combined JSON object. Otherwise, the input log data is forwarded as is.
Public Instance Methods
before_shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_detect_exceptions.rb, line 63 def before_shutdown flush_buffers super if defined?(super) end
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_detect_exceptions.rb, line 42 def configure(conf) super if multiline_flush_interval @check_flush_interval = [multiline_flush_interval * 0.1, 1].max end @languages = languages.map(&:to_sym) # Maps log stream tags to a corresponding TraceAccumulator. @accumulators = {} end
emit(tag, es, chain)
click to toggle source
# File lib/fluent/plugin/out_detect_exceptions.rb, line 76 def emit(tag, es, chain) es.each do |time_sec, record| process_record(tag, time_sec, record) end chain.next end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_detect_exceptions.rb, line 68 def shutdown # Before shutdown is not available in older fluentd versions. # Hence, we make sure that we flush the buffers here as well. flush_buffers @thread.join if @multiline_flush_interval super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_detect_exceptions.rb, line 54 def start super if multiline_flush_interval @flush_buffer_mutex = Mutex.new @stop_check = false @thread = Thread.new(&method(:check_flush_loop)) end end
Private Instance Methods
check_flush_loop()
click to toggle source
# File lib/fluent/plugin/out_detect_exceptions.rb, line 110 def check_flush_loop @flush_buffer_mutex.synchronize do loop do @flush_buffer_mutex.sleep(@check_flush_interval) now = Time.now break if @stop_check log.debug 'Reached flush loop so stopping' @accumulators.each_value do |acc| acc.force_flush if now - acc.buffer_start_time > @multiline_flush_interval end end end rescue log.error 'error in check_flush_loop', error: $ERROR_INFO.to_s log.error_backtrace end
flush_buffers()
click to toggle source
# File lib/fluent/plugin/out_detect_exceptions.rb, line 103 def flush_buffers synchronize do @stop_check = true @accumulators.each_value(&:force_flush) end end
process_record(tag, time_sec, record)
click to toggle source
# File lib/fluent/plugin/out_detect_exceptions.rb, line 85 def process_record(tag, time_sec, record) synchronize do log_id = [tag] log_id.push(record.fetch(@stream, '')) unless @stream.empty? unless @accumulators.key?(log_id) out_tag = tag.sub(/^#{Regexp.escape(@remove_tag_prefix)}\./, '') @accumulators[log_id] = Fluent::TraceAccumulator.new(@message, @languages, max_lines: @max_lines, max_bytes: @max_bytes) do |t, r| router.emit(out_tag, t, r) end end @accumulators[log_id].push(time_sec, record) end end
synchronize() { || ... }
click to toggle source
# File lib/fluent/plugin/out_detect_exceptions.rb, line 128 def synchronize(&block) if @multiline_flush_interval @flush_buffer_mutex.synchronize(&block) else yield end end