class Franz::Tail

Tail receives low-level file events from a Watch and handles the actual reading of files, providing a stream of lines.

Constants

ERR_BUFFER_FULL
ERR_INCOMPLETE_READ
ERR_INVALID_EVENT

Attributes

block_size[R]
buffer[R]
cursors[R]
reading[R]
tail_events[R]
watch_events[R]

Public Class Methods

new(opts={}) click to toggle source

Start a new Tail thread in the background.

@param opts [Hash] a complex Hash for tail configuration @option opts [InputConfig] :input_config shared Franz configuration

# File lib/franz/tail.rb, line 24
def initialize opts={}
  @ic = opts[:input_config] || raise('No input_config specified')

  @watch_events = opts[:watch_events] || []
  @tail_events  = opts[:tail_events]  || []

  @read_limit = opts[:read_limit] || 16_384 # 16 KiB
  @block_size = opts[:block_size] || 32_768 # 32 KiB
  @cursors    = opts[:cursors]    || Hash.new
  @logger     = opts[:logger]     || Logger.new(STDOUT)

  @nil_read = Hash.new { |h,k| h[k] = false }
  @buffer = Hash.new { |h, k| h[k] = BufferedTokenizer.new("\n") }
  @stop = false

  @statz = opts[:statz] || Franz::Stats.new
  @statz.create :num_reads, 0
  @statz.create :num_rotates, 0
  @statz.create :num_deletes, 0

  @tail_thread = Thread.new do
    handle(watch_events.shift) until @stop
  end

  log.debug \
    event: 'tail started',
    watch_events: watch_events,
    tail_events: tail_events,
    block_size: block_size
end

Public Instance Methods

state() click to toggle source

Return the internal “cursors” state

# File lib/franz/tail.rb, line 68
def state
  return @cursors.dup
end
stop() click to toggle source

Stop the Tail thread. Effectively only once.

@return [Hash] internal “cursors” state

# File lib/franz/tail.rb, line 58
def stop
  return state if @stop
  @stop = true
  @watch_thread.kill rescue nil
  @tail_thread.kill  rescue nil
  log.debug event: 'tail stopped'
  return state
end

Private Instance Methods

close(path) click to toggle source
# File lib/franz/tail.rb, line 174
def close path
  log.debug event: 'close', file: path
  tail_events.push path: path, line: buffer[path].flush
  @nil_read.delete path
  @cursors[path] = 0
end
handle(event) click to toggle source
# File lib/franz/tail.rb, line 182
def handle event
  path, size = event[:path], event[:size]
  log.debug \
    event: 'handle',
    raw: event
  case event[:name]

  when :deleted
    @statz.inc :num_deletes
    close path

  when :replaced, :truncated
    @statz.inc :num_rotates
    close path
    read path, size

  when :appended
    # Ignore read requests after a nil read. We'll wait for the next
    # event that tells us to close the file. Fingers crossed...
    unless @nil_read[path]
      @statz.inc :num_reads
      read path, size

    else # following a nil read
      log.debug \
        event: 'skipping read',
        raw: event
    end

  else
    log.fatal event: 'invalid event', raw: event
    exit ERR_INVALID_EVENT
  end

  return path
end
log() click to toggle source
# File lib/franz/tail.rb, line 75
def log ; @logger end
read(path, size) click to toggle source
# File lib/franz/tail.rb, line 78
def read path, size
  log.debug \
    event: 'read',
    file: path,
    size: size
  @cursors[path] ||= 0
  spread = size - @cursors[path]

  # A negative spread size means we've probably worked ahead of ourselves.
  # In such a case, we'll ignore the request, as it's likely been fulfilled.
  # We only need to worry if the spread size grows larger than the block
  # size--that means something other than us reading threw Franz off...
  if spread < 0
    if spread.abs > @block_size
      log.warn \
        event: 'large spread',
        file: path,
        size: size,
        cursor: @cursors[path],
        spread: spread
    end
    return
  end

  if spread > @read_limit
    log.warn \
      event: 'large read',
      file: path,
      size: size,
      cursor: @cursors[path],
      spread: spread
  end

  loop do
    break if @cursors[path] >= size

    begin
      data = IO::read path, @block_size, @cursors[path]
      reason_for_nil_data = 'unknown'
    rescue EOFError
      data = nil
      reason_for_nil_data = 'eof'
    rescue Errno::ENOENT
      data = nil
      reason_for_nil_data = 'noent'
    rescue Errno::EACCES
      data = nil
      reason_for_nil_data = 'access'
    end

    if data.nil?
      # Not so sure of myself here: It's been truncated, it's been rotated,
      # or else it no longer exists. We "return" in hopes that a :truncated,
      # :rotated, :deleted event comes along soon after. If it doesn't...
      log.error \
        event: 'nil read',
        file: path,
        size: size,
        cursor: @cursors[path],
        spread: (size - @cursors[path]),
        reason: reason_for_nil_data
      @nil_read[path] = true
      return
    end

    data_size = data.bytesize

    begin
      buffer[path].extract(data).each do |line|
        tail_events.push path: path, line: line
      end
    rescue RuntimeError
      log.fatal \
        event: 'buffer full',
        file: path,
        size: size,
        cursor: @cursors[path],
        spread: (size - @cursors[path])
      exit ERR_BUFFER_FULL
    end

    @cursors[path] += data_size
  end

  if @cursors[path] < size
    log.fatal \
      event: 'incomplete read',
      file: path,
      size: size,
      cursor: @cursors[path],
      spread: (size - @cursors[path])
    exit ERR_INCOMPLETE_READ
  end
end