class Fluent::Plugin::DockerJournaldConcatFilter

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 23
def initialize
  super

  @buffer = Hash.new {|h, k| h[k] = [] }
  @timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now }
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 30
def configure(conf)
  super

  @separator = ""
end
filter_stream(tag, es) click to toggle source
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 50
def filter_stream(tag, es)
  new_es = Fluent::MultiEventStream.new
  es.each do |time, record|
    if /\Afluent\.(?:trace|debug|info|warn|error|fatal)\z/ =~ tag
      new_es.add(time, record)
      next
    end
    begin
      flushed_es = process(tag, time, record)
      unless flushed_es.empty?
        flushed_es.each do |_time, new_record|
          new_es.add(time, record.merge(new_record))
        end
      end
    rescue => e
      router.emit_error_event(tag, time, record, e)
    end
  end
  new_es
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 43
def shutdown
  super

  @finished = true
  flush_shutdown_buffer
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 36
def start
  super

  @finished = false
  timer_execute(:filter_concat_timer, 1, &method(:on_timer))
end

Private Instance Methods

flush_buffer(stream_identity) click to toggle source
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 107
def flush_buffer(stream_identity)
  lines = @buffer[stream_identity].map {|_tag, _time, record| record[@key] }
  _tag, time, first_record = @buffer[stream_identity].first
  new_record = {
    @key => lines.join(@separator)
  }
  @buffer[stream_identity].clear
  merged_record = first_record.merge(new_record)
  sanitize_record(merged_record)
  [time, merged_record]
end
flush_shutdown_buffer() click to toggle source
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 139
def flush_shutdown_buffer
  @buffer.each do |stream_identity, elements|
    next if elements.empty?
    tag = stream_identity.split(":").first
    time, record = flush_buffer(stream_identity)
    router.emit(tag, time, record)
    log.info("Shutdown flush: #{stream_identity}")
  end
  @buffer.clear
  log.info("Filter docker_journald_concat shutdown finished")
end
flush_timeout_buffer() click to toggle source
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 119
def flush_timeout_buffer
  # now = Fluent::Engine.now
  now = Fluent::EventTime.now
  timeout_stream_identities = []
  @timeout_map.each do |stream_identity, previous_timestamp|
    next if @flush_interval > (now - previous_timestamp)
    next if @buffer[stream_identity].empty?
    time, record = flush_buffer(stream_identity)
    sanitize_record(record)
    timeout_stream_identities << stream_identity
    tag = stream_identity.split(":").first
    message = "Timeout flush: #{stream_identity}"
    handle_timeout_error(tag, time, record, message)
    log.info(message)
  end
  @timeout_map.reject! do |stream_identity, _|
    timeout_stream_identities.include?(stream_identity)
  end
end
handle_timeout_error(tag, time, record, message) click to toggle source
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 151
def handle_timeout_error(tag, time, record, message)
  if @timeout_label
    event_router = event_emitter_router(@timeout_label)
    event_router.emit(tag, time, record)
  else
    router.emit_error_event(tag, time, record, TimeoutError.new(message))
  end
end
on_timer() click to toggle source
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 73
def on_timer
  return if @flush_interval <= 0
  return if @finished
  flush_timeout_buffer
end
partial?(text) click to toggle source
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 99
def partial?(text)
  /^true$/.match(text)
end
process(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 79
def process(tag, time, record)
  stream_identity = "#{tag}:#{record[@stream_identity_key]}"
  @timeout_map[stream_identity] = Fluent::Engine.now
  new_es = Fluent::MultiEventStream.new
  if partial?(record[@match_key])
    # partial record
    @buffer[stream_identity] << [tag, time, record]
    return new_es
  elsif !@buffer[stream_identity].empty?
    # last partial record
    @buffer[stream_identity] << [tag, time, record]
    _, new_record = flush_buffer(stream_identity)
    new_es.add(time, new_record)
    return new_es
  end
  # regular record
  new_es.add(time, record)
  new_es
end
sanitize_record(record) click to toggle source
# File lib/fluent/plugin/filter_docker_journald_concat.rb, line 103
def sanitize_record(record)
  record.delete(@match_key)
end