class Fluent::Plugin::DockerJournaldConcatFilter
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 23 def initialize super @buffer = Hash.new {|h, k| h[k] = [] } @timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now } end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 30 def configure(conf) super @separator = "" end
filter_stream(tag, es)
click to toggle source
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 50 def filter_stream(tag, es) new_es = Fluent::MultiEventStream.new es.each do |time, record| if /\Afluent\.(?:trace|debug|info|warn|error|fatal)\z/ =~ tag new_es.add(time, record) next end begin flushed_es = process(tag, time, record) unless flushed_es.empty? flushed_es.each do |_time, new_record| new_es.add(time, record.merge(new_record)) end end rescue => e router.emit_error_event(tag, time, record, e) end end new_es end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 43 def shutdown super @finished = true flush_shutdown_buffer end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 36 def start super @finished = false timer_execute(:filter_concat_timer, 1, &method(:on_timer)) end
Private Instance Methods
flush_buffer(stream_identity)
click to toggle source
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 107 def flush_buffer(stream_identity) lines = @buffer[stream_identity].map {|_tag, _time, record| record[@key] } _tag, time, first_record = @buffer[stream_identity].first new_record = { @key => lines.join(@separator) } @buffer[stream_identity].clear merged_record = first_record.merge(new_record) sanitize_record(merged_record) [time, merged_record] end
flush_shutdown_buffer()
click to toggle source
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 139 def flush_shutdown_buffer @buffer.each do |stream_identity, elements| next if elements.empty? tag = stream_identity.split(":").first time, record = flush_buffer(stream_identity) router.emit(tag, time, record) log.info("Shutdown flush: #{stream_identity}") end @buffer.clear log.info("Filter docker_journald_concat shutdown finished") end
flush_timeout_buffer()
click to toggle source
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 119 def flush_timeout_buffer # now = Fluent::Engine.now now = Fluent::EventTime.now timeout_stream_identities = [] @timeout_map.each do |stream_identity, previous_timestamp| next if @flush_interval > (now - previous_timestamp) next if @buffer[stream_identity].empty? time, record = flush_buffer(stream_identity) sanitize_record(record) timeout_stream_identities << stream_identity tag = stream_identity.split(":").first message = "Timeout flush: #{stream_identity}" handle_timeout_error(tag, time, record, message) log.info(message) end @timeout_map.reject! do |stream_identity, _| timeout_stream_identities.include?(stream_identity) end end
handle_timeout_error(tag, time, record, message)
click to toggle source
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 151 def handle_timeout_error(tag, time, record, message) if @timeout_label event_router = event_emitter_router(@timeout_label) event_router.emit(tag, time, record) else router.emit_error_event(tag, time, record, TimeoutError.new(message)) end end
on_timer()
click to toggle source
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 73 def on_timer return if @flush_interval <= 0 return if @finished flush_timeout_buffer end
partial?(text)
click to toggle source
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 99 def partial?(text) /^true$/.match(text) end
process(tag, time, record)
click to toggle source
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 79 def process(tag, time, record) stream_identity = "#{tag}:#{record[@stream_identity_key]}" @timeout_map[stream_identity] = Fluent::Engine.now new_es = Fluent::MultiEventStream.new if partial?(record[@match_key]) # partial record @buffer[stream_identity] << [tag, time, record] return new_es elsif !@buffer[stream_identity].empty? # last partial record @buffer[stream_identity] << [tag, time, record] _, new_record = flush_buffer(stream_identity) new_es.add(time, new_record) return new_es end # regular record new_es.add(time, record) new_es end
sanitize_record(record)
click to toggle source
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 103 def sanitize_record(record) record.delete(@match_key) end