class Franz::Tail
Tail
receives low-level file events from a Watch
and handles the actual reading of files, providing a stream of lines.
Constants
- ERR_BUFFER_FULL
- ERR_INCOMPLETE_READ
- ERR_INVALID_EVENT
Attributes
block_size[R]
buffer[R]
cursors[R]
reading[R]
tail_events[R]
watch_events[R]
Public Class Methods
new(opts={})
click to toggle source
Start a new Tail
thread in the background.
@param opts [Hash] a complex Hash for tail configuration @option opts [InputConfig] :input_config shared Franz
configuration
# File lib/franz/tail.rb, line 24 def initialize opts={} @ic = opts[:input_config] || raise('No input_config specified') @watch_events = opts[:watch_events] || [] @tail_events = opts[:tail_events] || [] @read_limit = opts[:read_limit] || 16_384 # 16 KiB @block_size = opts[:block_size] || 32_768 # 32 KiB @cursors = opts[:cursors] || Hash.new @logger = opts[:logger] || Logger.new(STDOUT) @nil_read = Hash.new { |h,k| h[k] = false } @buffer = Hash.new { |h, k| h[k] = BufferedTokenizer.new("\n") } @stop = false @statz = opts[:statz] || Franz::Stats.new @statz.create :num_reads, 0 @statz.create :num_rotates, 0 @statz.create :num_deletes, 0 @tail_thread = Thread.new do handle(watch_events.shift) until @stop end log.debug \ event: 'tail started', watch_events: watch_events, tail_events: tail_events, block_size: block_size end
Public Instance Methods
state()
click to toggle source
Return the internal “cursors” state
# File lib/franz/tail.rb, line 68 def state return @cursors.dup end
stop()
click to toggle source
Stop the Tail
thread. Effectively only once.
@return [Hash] internal “cursors” state
# File lib/franz/tail.rb, line 58 def stop return state if @stop @stop = true @watch_thread.kill rescue nil @tail_thread.kill rescue nil log.debug event: 'tail stopped' return state end
Private Instance Methods
close(path)
click to toggle source
# File lib/franz/tail.rb, line 174 def close path log.debug event: 'close', file: path tail_events.push path: path, line: buffer[path].flush @nil_read.delete path @cursors[path] = 0 end
handle(event)
click to toggle source
# File lib/franz/tail.rb, line 182 def handle event path, size = event[:path], event[:size] log.debug \ event: 'handle', raw: event case event[:name] when :deleted @statz.inc :num_deletes close path when :replaced, :truncated @statz.inc :num_rotates close path read path, size when :appended # Ignore read requests after a nil read. We'll wait for the next # event that tells us to close the file. Fingers crossed... unless @nil_read[path] @statz.inc :num_reads read path, size else # following a nil read log.debug \ event: 'skipping read', raw: event end else log.fatal event: 'invalid event', raw: event exit ERR_INVALID_EVENT end return path end
log()
click to toggle source
# File lib/franz/tail.rb, line 75 def log ; @logger end
read(path, size)
click to toggle source
# File lib/franz/tail.rb, line 78 def read path, size log.debug \ event: 'read', file: path, size: size @cursors[path] ||= 0 spread = size - @cursors[path] # A negative spread size means we've probably worked ahead of ourselves. # In such a case, we'll ignore the request, as it's likely been fulfilled. # We only need to worry if the spread size grows larger than the block # size--that means something other than us reading threw Franz off... if spread < 0 if spread.abs > @block_size log.warn \ event: 'large spread', file: path, size: size, cursor: @cursors[path], spread: spread end return end if spread > @read_limit log.warn \ event: 'large read', file: path, size: size, cursor: @cursors[path], spread: spread end loop do break if @cursors[path] >= size begin data = IO::read path, @block_size, @cursors[path] reason_for_nil_data = 'unknown' rescue EOFError data = nil reason_for_nil_data = 'eof' rescue Errno::ENOENT data = nil reason_for_nil_data = 'noent' rescue Errno::EACCES data = nil reason_for_nil_data = 'access' end if data.nil? # Not so sure of myself here: It's been truncated, it's been rotated, # or else it no longer exists. We "return" in hopes that a :truncated, # :rotated, :deleted event comes along soon after. If it doesn't... log.error \ event: 'nil read', file: path, size: size, cursor: @cursors[path], spread: (size - @cursors[path]), reason: reason_for_nil_data @nil_read[path] = true return end data_size = data.bytesize begin buffer[path].extract(data).each do |line| tail_events.push path: path, line: line end rescue RuntimeError log.fatal \ event: 'buffer full', file: path, size: size, cursor: @cursors[path], spread: (size - @cursors[path]) exit ERR_BUFFER_FULL end @cursors[path] += data_size end if @cursors[path] < size log.fatal \ event: 'incomplete read', file: path, size: size, cursor: @cursors[path], spread: (size - @cursors[path]) exit ERR_INCOMPLETE_READ end end