class FileWatch::TailMode::Handlers::Base
Attributes
sincedb_collection[R]
Public Class Methods
new(processor, sincedb_collection, observer, settings)
click to toggle source
# File lib/filewatch/tail_mode/handlers/base.rb, line 9 def initialize(processor, sincedb_collection, observer, settings) @settings = settings @processor = processor @sincedb_collection = sincedb_collection @observer = observer end
Public Instance Methods
handle(watched_file)
click to toggle source
# File lib/filewatch/tail_mode/handlers/base.rb, line 20 def handle(watched_file) logger.trace? && logger.trace("handling:", :path => watched_file.path) unless watched_file.has_listener? watched_file.set_listener(@observer) end handle_specifically(watched_file) end
handle_specifically(watched_file)
click to toggle source
# File lib/filewatch/tail_mode/handlers/base.rb, line 28 def handle_specifically(watched_file) # some handlers don't need to define this method end
quit?()
click to toggle source
# File lib/filewatch/tail_mode/handlers/base.rb, line 16 def quit? @processor.watch.quit? end
update_existing_specifically(watched_file, sincedb_value)
click to toggle source
# File lib/filewatch/tail_mode/handlers/base.rb, line 32 def update_existing_specifically(watched_file, sincedb_value) # when a handler subclass does not implement this then do nothing end
Private Instance Methods
add_new_value_sincedb_collection(watched_file)
click to toggle source
# File lib/filewatch/tail_mode/handlers/base.rb, line 141 def add_new_value_sincedb_collection(watched_file) sincedb_value = get_new_value_specifically(watched_file) logger.trace? && logger.trace("add_new_value_sincedb_collection", :position => sincedb_value.position, :watched_file => watched_file.details) sincedb_collection.set(watched_file.sincedb_key, sincedb_value) sincedb_value end
add_or_update_sincedb_collection(watched_file)
click to toggle source
# File lib/filewatch/tail_mode/handlers/base.rb, line 104 def add_or_update_sincedb_collection(watched_file) sincedb_value = @sincedb_collection.find(watched_file) if sincedb_value.nil? sincedb_value = add_new_value_sincedb_collection(watched_file) watched_file.initial_completed elsif sincedb_value.watched_file == watched_file update_existing_sincedb_collection_value(watched_file, sincedb_value) watched_file.initial_completed else logger.trace? && logger.trace("add_or_update_sincedb_collection: found sincedb record", :sincedb_key => watched_file.sincedb_key, :sincedb_value => sincedb_value) # detected a rotation, Discoverer can't handle this because this watched file is not a new discovery. # we must handle it here, by transferring state and have the sincedb value track this watched file # rotate_as_file and rotate_from will switch the sincedb key to the inode that the path is now pointing to # and pickup the sincedb_value from before. logger.debug("add_or_update_sincedb_collection: the found sincedb_value has a watched_file - this is a rename, switching inode to this watched file") existing_watched_file = sincedb_value.watched_file if existing_watched_file.nil? sincedb_value.set_watched_file(watched_file) logger.trace? && logger.trace("add_or_update_sincedb_collection: switching as new file") watched_file.rotate_as_file watched_file.update_bytes_read(sincedb_value.position) else sincedb_value.set_watched_file(watched_file) logger.trace? && logger.trace("add_or_update_sincedb_collection: switching from:", :watched_file => watched_file.details) watched_file.rotate_from(existing_watched_file) end end sincedb_value end
controlled_read(watched_file, loop_control)
click to toggle source
# File lib/filewatch/tail_mode/handlers/base.rb, line 38 def controlled_read(watched_file, loop_control) changed = false logger.trace? && logger.trace(__method__.to_s, :iterations => loop_control.count, :amount => loop_control.size, :filename => watched_file.filename) # from a real config (has 102 file inputs) # -- This cfg creates a file input for every log file to create a dedicated file pointer and read all file simultaneously # -- If we put all log files in one file input glob we will have indexing delay, because Logstash waits until the first file becomes EOF # by allowing the user to specify a combo of `file_chunk_count` X `file_chunk_size`... # we enable the pseudo parallel processing of each file. # user also has the option to specify a low `stat_interval` and a very high `discover_interval`to respond # quicker to changing files and not allowing too much content to build up before reading it. loop_control.count.times do break if quit? begin logger.debug? && logger.debug("#{__method__} get chunk") result = watched_file.read_extract_lines(loop_control.size) # expect BufferExtractResult logger.trace(result.warning, result.additional) unless result.warning.empty? changed = true result.lines.each do |line| watched_file.listener.accept(line) # sincedb position is now independent from the watched_file bytes_read sincedb_collection.increment(watched_file.sincedb_key, line.bytesize + @settings.delimiter_byte_size) end rescue EOFError => e # it only makes sense to signal EOF in "read" mode not "tail" logger.debug(__method__.to_s, exception_details(watched_file.path, e, false)) loop_control.flag_read_error break rescue Errno::EWOULDBLOCK, Errno::EINTR => e logger.debug(__method__.to_s, exception_details(watched_file.path, e, false)) watched_file.listener.error loop_control.flag_read_error break rescue => e logger.error("#{__method__} general error reading", exception_details(watched_file.path, e)) watched_file.listener.error loop_control.flag_read_error break end end logger.debug("#{__method__} stopped loop due quit") if quit? sincedb_collection.request_disk_flush if changed end
exception_details(path, e, trace = true)
click to toggle source
# File lib/filewatch/tail_mode/handlers/base.rb, line 159 def exception_details(path, e, trace = true) details = { :path => path, :exception => e.class, :message => e.message } details[:backtrace] = e.backtrace if trace && logger.debug? details end
get_new_value_specifically(watched_file)
click to toggle source
# File lib/filewatch/tail_mode/handlers/base.rb, line 149 def get_new_value_specifically(watched_file) position = watched_file.position_for_new_sincedb_value value = SincedbValue.new(position) value.set_watched_file(watched_file) watched_file.update_bytes_read(position) value end
open_file(watched_file)
click to toggle source
# File lib/filewatch/tail_mode/handlers/base.rb, line 81 def open_file(watched_file) return true if watched_file.file_open? logger.trace? && logger.trace("open_file", :filename => watched_file.filename) begin watched_file.open rescue => e # don't emit this message too often. if a file that we can't # read is changing a lot, we'll try to open it more often, and spam the logs. now = Time.now.to_i logger.trace? && logger.trace("open_file OPEN_WARN_INTERVAL is '#{OPEN_WARN_INTERVAL}'") if watched_file.last_open_warning_at.nil? || now - watched_file.last_open_warning_at > OPEN_WARN_INTERVAL logger.warn("failed to open file", exception_details(watched_file.path, e)) watched_file.last_open_warning_at = now else logger.debug("open_file suppressed warning `failed to open file`", exception_details(watched_file.path, e, false)) end watched_file.watch # set it back to watch so we can try it again else watched_file.listener.opened end watched_file.file_open? end
update_existing_sincedb_collection_value(watched_file, sincedb_value)
click to toggle source
# File lib/filewatch/tail_mode/handlers/base.rb, line 135 def update_existing_sincedb_collection_value(watched_file, sincedb_value) logger.trace? && logger.trace("update_existing_sincedb_collection_value", :position => sincedb_value.position, :filename => watched_file.filename, :last_stat_size => watched_file.last_stat_size) update_existing_specifically(watched_file, sincedb_value) end