class Franz::Agg

Agg mostly aggregates Tail events by applying the multiline filter, but it also applies the “host” and “type” fields. Basically, it does all the post- processing after we’ve retreived a line from a file.

Attributes

agg_events[R]
buffer[R]
flush_interval[R]
lock[R]
seqs[R]
tail_events[R]

Public Class Methods

new(opts={}) click to toggle source

Start a new Agg thread in the background.

@param [Hash] opts options for the aggregator @option opts [InputConfig] :input_config shared Franz configuration @option opts [Queue] :tail_events ([]) “input” queue from Tail @option opts [Queue] :agg_events ([]) “output” queue @option opts [Integer] :flush_interval (5) seconds between flushes @option opts [Hash<Path,Fixnum>] :seqs ({}) internal “seqs” state @option opts [Logger] :logger (Logger.new(STDOUT)) logger to use

# File lib/franz/agg.rb, line 29
def initialize opts={}
  @ic = opts[:input_config] || raise('No input_config specified')

  @tail_events = opts[:tail_events] || []
  @agg_events  = opts[:agg_events]  || []

  @buffer_limit   = opts[:buffer_limit]   || 50
  @flush_interval = opts[:flush_interval] || 10
  @seqs           = opts[:seqs]           || Hash.new
  @logger         = opts[:logger]         || Logger.new(STDOUT)

  @lock   = Hash.new { |h,k| h[k] = Mutex.new }
  @buffer = Franz::Sash.new
  @stop   = false

  @statz = opts[:statz] || Franz::Stats.new
  @statz.create :num_lines, 0

  @t1 = Thread.new do
    until @stop
      flush
      sleep flush_interval
    end
    flush true
  end

  @t2 = Thread.new do
    capture until @stop
  end

  log.debug \
    event: 'agg started',
    tail_events: @tail_events,
    agg_events: @agg_events
end

Public Instance Methods

state() click to toggle source

Return the internal “seqs” state

# File lib/franz/agg.rb, line 78
def state
  return @seqs.dup
end
stop() click to toggle source

Stop the Agg thread. Effectively only once.

@return [Hash] internal “seqs” state

# File lib/franz/agg.rb, line 68
def stop
  return state if @stop
  @stop = true
  @t2.kill
  @t1.join
  log.debug event: 'agg stopped'
  return state
end

Private Instance Methods

capture() click to toggle source
# File lib/franz/agg.rb, line 143
def capture
  event = tail_events.shift
  log.debug \
    event: 'capture',
    raw: event
  multiline = @ic.config(event[:path])[:multiline] rescue nil
  unless multiline
    @statz.inc :num_lines
    enqueue event[:path], event[:line] unless event[:line].empty?
  else
    lock[event[:path]].synchronize do
      size = buffer.size(event[:path])
      if size > @buffer_limit
        log.debug \
          event: 'buffer overflow',
          file: event[:path],
          size: size,
          limit: @buffer_limit
      end
      if event[:line] =~ multiline
        buffered = buffer.flush(event[:path])
        lines    = buffered.map { |e| e[:line] }
        @statz.inc :num_lines, lines.length
        lines    = lines.join("\n")
        enqueue event[:path], lines unless lines.empty?
      end
      buffer.insert event[:path], event
    end
  end
end
enqueue(path, message) click to toggle source
# File lib/franz/agg.rb, line 91
def enqueue path, message
  if @ic.drop? path, message
    log.debug \
      event: 'dropped',
      file: path,
      message: message
    return
  end

  unless @ic.keep? path, message
    log.debug \
      event: 'unkept',
      file: path,
      message: message
    return
  end

  t = @ic.type path
  if t.nil?
    log.debug \
      event: 'enqueue skipped',
      file: path,
      message: message
    return
  end

  log.debug \
    event: 'enqueue',
    file: path,
    message: message
  s = seq path
  m = message.encode 'UTF-8', invalid: :replace, undef: :replace, replace: '?'

  event = { type: t, host: @@host, path: path, '@seq' => s }

  if @ic.json? path
    begin
      event.merge! JSON::parse(m)
    rescue JSON::ParserError
      log.error \
        event: 'json parse failed',
        file: path,
        message: m
      event.merge! message: m, _err: 'json parse failed'
    end
  else
    event.merge! message: m
  end

  agg_events.push event
end
flush(force=false, started=Time.now) click to toggle source
# File lib/franz/agg.rb, line 174
def flush force=false, started=Time.now
  log.debug \
    event: 'flush',
    force: force,
    started: started
  buffer.keys.each do |path|
    lock[path].synchronize do
      if force || started - buffer.mtime(path) >= flush_interval
        buffered = buffer.remove(path)
        lines    = buffered.map { |e| e[:line] }
        @statz.inc :num_lines, lines.length
        lines    = lines.join("\n")
        enqueue path, lines unless lines.empty?
      end
    end
  end
end
log() click to toggle source
# File lib/franz/agg.rb, line 85
def log ; @logger end
seq(path) click to toggle source
# File lib/franz/agg.rb, line 87
def seq path
  seqs[path] = seqs.fetch(path, 0) + 1
end