class FileWatch::WatchedFile
Constants
- IO_BASED_STAT
- PATH_BASED_STAT
Attributes
accessed_at[R]
buffer[R]
bytes_read[R]
bytes_unread[R]
file[R]
filename[R]
last_open_warning_at[RW]
listener[R]
loop_count_mode[R]
loop_count_type[R]
path[R]
pathname[R]
read_chunk_size[R]
read_loop_count[R]
recent_states[R]
stat[R]
state[R]
Public Class Methods
new(pathname, stat, settings)
click to toggle source
this class represents a file that has been discovered path based stat is taken at discovery
# File lib/filewatch/watched_file.rb, line 16 def initialize(pathname, stat, settings) @settings = settings @pathname = Pathname.new(pathname) # given arg pathname might be a string or a Pathname object @path = @pathname.to_path.freeze @filename = @pathname.basename.to_s full_state_reset(stat) watch set_standard_read_loop set_accessed_at end
Public Instance Methods
activate()
click to toggle source
# File lib/filewatch/watched_file.rb, line 282 def activate set_state :active end
active?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 319 def active? @state == :active end
all_read?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 161 def all_read? @bytes_read >= @size end
buffer_extract(data)
click to toggle source
# File lib/filewatch/watched_file.rb, line 247 def buffer_extract(data) warning, additional = "", {} lines = @buffer.extract(data) if lines.empty? warning.concat("buffer_extract: a delimiter can't be found in current chunk") warning.concat(", maybe there are no more delimiters or the delimiter is incorrect") warning.concat(" or the text before the delimiter, a 'line', is very large") warning.concat(", if this message is logged often try increasing the `file_chunk_size` setting.") additional["delimiter"] = @settings.delimiter additional["read_position"] = @bytes_read additional["bytes_read_count"] = data.bytesize additional["last_known_file_size"] = last_stat_size additional["file_path"] = @path end BufferExtractResult.new(lines, warning, additional) end
close()
click to toggle source
# File lib/filewatch/watched_file.rb, line 295 def close set_state :closed end
closed?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 331 def closed? @state == :closed end
compressed?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 197 def compressed? @path.end_with?('.gz','.gzip') end
current_size()
click to toggle source
# File lib/filewatch/watched_file.rb, line 143 def current_size @size end
delay_delete()
click to toggle source
# File lib/filewatch/watched_file.rb, line 307 def delay_delete set_state :delayed_delete end
delayed_delete?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 323 def delayed_delete? @state == :delayed_delete end
details()
click to toggle source
# File lib/filewatch/watched_file.rb, line 418 def details detail = "@filename='#{@filename}', @state=#{@state.inspect}, @recent_states=#{@recent_states.inspect}, " detail.concat("@bytes_read=#{@bytes_read}, @bytes_unread=#{@bytes_unread}, current_size=#{current_size}, ") detail.concat("last_stat_size=#{last_stat_size}, file_open?=#{file_open?}, @initial=#{@initial}") "<FileWatch::WatchedFile: #{detail}, sincedb_key='#{sincedb_key}'>" end
expiry_close_enabled?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 343 def expiry_close_enabled? !@settings.close_older.nil? end
expiry_ignore_enabled?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 347 def expiry_ignore_enabled? !@settings.ignore_older.nil? end
file_add_opened(rubyfile)
click to toggle source
# File lib/filewatch/watched_file.rb, line 212 def file_add_opened(rubyfile) @file = rubyfile @buffer = BufferedTokenizer.new(@settings.delimiter) if @buffer.nil? end
file_at_path_found_again()
click to toggle source
# File lib/filewatch/watched_file.rb, line 165 def file_at_path_found_again restore_previous_state end
file_can_close?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 413 def file_can_close? return false unless expiry_close_enabled? (Time.now.to_f - @accessed_at) > @settings.close_older end
file_closable?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 401 def file_closable? file_can_close? && all_read? end
file_close()
click to toggle source
# File lib/filewatch/watched_file.rb, line 217 def file_close return if @file.nil? || @file.closed? @file.close @file = nil end
file_ignorable?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 405 def file_ignorable? return false unless expiry_ignore_enabled? # (Time.now - stat.mtime) <- in jruby, this does int and float # conversions before the subtraction and returns a float. # so use all floats upfront (Time.now.to_f - modified_at) > @settings.ignore_older end
file_open?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 232 def file_open? !@file.nil? && !@file.closed? end
file_read(amount = nil)
click to toggle source
# File lib/filewatch/watched_file.rb, line 227 def file_read(amount = nil) set_accessed_at @file.sysread(amount || @read_chunk_size) end
file_seek(amount, whence = IO::SEEK_SET)
click to toggle source
# File lib/filewatch/watched_file.rb, line 223 def file_seek(amount, whence = IO::SEEK_SET) @file.sysseek(amount, whence) end
full_state_reset(this_stat = nil)
click to toggle source
# File lib/filewatch/watched_file.rb, line 27 def full_state_reset(this_stat = nil) if this_stat.nil? begin this_stat = PathStatClass.new(pathname) rescue Errno::ENOENT delay_delete return end end @bytes_read = 0 # tracks bytes read from the open file or initialized from a matched sincedb_value off disk. @bytes_unread = 0 # tracks bytes not yet read from the open file. So we can warn on shrink when unread bytes are seen. file_close set_stat(this_stat) @listener = nil @last_open_warning_at = nil # initial as true means we have not associated this watched_file with a previous sincedb value yet. # and we should read from the beginning if necessary @initial = true @recent_states = [] # keep last 8 states, managed in set_state # the prepare_inode method is sourced from the mixed module above watch if active? || @state.nil? end
grown?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 151 def grown? @size > @bytes_read end
has_listener?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 177 def has_listener? !@listener.nil? end
ignore()
click to toggle source
# File lib/filewatch/watched_file.rb, line 286 def ignore set_state :ignored end
ignore_as_unread()
click to toggle source
# File lib/filewatch/watched_file.rb, line 290 def ignore_as_unread ignore @bytes_read = @size end
ignored?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 327 def ignored? @state == :ignored end
increment_bytes_read(delta)
click to toggle source
# File lib/filewatch/watched_file.rb, line 264 def increment_bytes_read(delta) return if delta.nil? @bytes_read += delta update_bytes_unread @bytes_read end
initial?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 193 def initial? @initial end
initial_completed()
click to toggle source
# File lib/filewatch/watched_file.rb, line 185 def initial_completed @initial = false end
inspect()
click to toggle source
# File lib/filewatch/watched_file.rb, line 425 def inspect "<FileWatch::WatchedFile: @filename='#{@filename}', @state=#{@state.inspect}, current_size=#{current_size}, sincedb_key='#{sincedb_key}'>" end
last_stat_size()
click to toggle source
# File lib/filewatch/watched_file.rb, line 139 def last_stat_size @stat.size end
loop_control_adjusted_for_stat_size()
click to toggle source
# File lib/filewatch/watched_file.rb, line 365 def loop_control_adjusted_for_stat_size more = false to_read = current_size - @bytes_read return LoopControlResult.new(0, 0, more) if to_read < 1 return LoopControlResult.new(1, to_read, more) if to_read < @read_chunk_size # set as if to_read is greater than or equal to max_bytes # use the ones from settings and don't indicate more count = @read_loop_count if to_read < @standard_loop_max_bytes # if the defaults are used then this branch will be taken # e.g. to_read is 100 and max_bytes is 4 * 30 -> 120 # will overrun and trigger EOF, build less iterations # will generate 3 * 30 -> 90 this time and we indicate more # a 2GB file in read mode will get one loop of 64666 x 32768 (2119006656 / 32768) # and a second loop with 1 x 31168 count = to_read / @read_chunk_size more = true end LoopControlResult.new(count, @read_chunk_size, more) end
modified_at(update = false)
click to toggle source
# File lib/filewatch/watched_file.rb, line 115 def modified_at(update = false) if update || @modified_at.nil? @modified_at = @stat.modified_at else @modified_at end end
modified_at_changed?()
click to toggle source
@return whether modified_at
changed since it was last read @see restat!
# File lib/filewatch/watched_file.rb, line 125 def modified_at_changed? modified_at != @stat.modified_at end
open()
click to toggle source
# File lib/filewatch/watched_file.rb, line 208 def open file_add_opened(FileOpener.open(@path)) end
position_for_new_sincedb_value()
click to toggle source
# File lib/filewatch/watched_file.rb, line 129 def position_for_new_sincedb_value if @initial # this file was found in first discovery @settings.start_new_files_at == :beginning ? 0 : last_stat_size else # always start at the beginning if found after first discovery 0 end end
read_extract_lines(amount)
click to toggle source
# File lib/filewatch/watched_file.rb, line 240 def read_extract_lines(amount) data = file_read(amount) result = buffer_extract(data) increment_bytes_read(data.bytesize) result end
recent_state_history()
click to toggle source
# File lib/filewatch/watched_file.rb, line 397 def recent_state_history @recent_states + Array(@state) end
reopen()
click to toggle source
# File lib/filewatch/watched_file.rb, line 201 def reopen if file_open? file_close open end end
reset_buffer()
click to toggle source
# File lib/filewatch/watched_file.rb, line 236 def reset_buffer @buffer.flush end
reset_bytes_unread()
click to toggle source
# File lib/filewatch/watched_file.rb, line 386 def reset_bytes_unread # called from shrink @bytes_unread = 0 end
restat!()
click to toggle source
@return true if the file was modified since last stat
# File lib/filewatch/watched_file.rb, line 101 def restat! modified_at # to always be able to detect changes @stat.restat if rotation_detected? # switch to new state now rotation_in_progress return true else @size = @stat.size update_bytes_unread modified_at_changed? end end
restore_previous_state()
click to toggle source
# File lib/filewatch/watched_file.rb, line 311 def restore_previous_state set_state @recent_states.pop end
rotate_as_file(bytes_read = 0)
click to toggle source
# File lib/filewatch/watched_file.rb, line 76 def rotate_as_file(bytes_read = 0) # rotation, when a sincedb record exists for new inode, but no watched file to rotate from # probably caused by a deletion detected in the middle of the rename cascade # RARE due to delayed_delete - there would have to be a large time span between the renames. @bytes_read = bytes_read # tracks bytes read from the open file or initialized from a matched sincedb_value off disk. @bytes_unread = 0 # tracks bytes not yet read from the open file. So we can warn on shrink when unread bytes are seen. @last_open_warning_at = nil # initial as true means we have not associated this watched_file with a previous sincedb value yet. # and we should read from the beginning if necessary @initial = false @recent_states = [] # keep last 8 states, managed in set_state set_stat(PathStatClass.new(pathname)) reopen watch end
rotate_from(other)
click to toggle source
# File lib/filewatch/watched_file.rb, line 50 def rotate_from(other) # move all state from other to this one set_standard_read_loop file_close @bytes_read = other.bytes_read @bytes_unread = other.bytes_unread @listener = nil @initial = false @recent_states = other.recent_states @accessed_at = other.accessed_at if !other.delayed_delete? # we don't know if a file exists at the other.path yet # so no reset other.full_state_reset end set_stat PathStatClass.new(pathname) ignore end
rotation_detected?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 96 def rotation_detected? stat_sincedb_key != sincedb_key end
rotation_in_progress()
click to toggle source
# File lib/filewatch/watched_file.rb, line 278 def rotation_in_progress set_state :rotation_in_progress end
rotation_in_progress?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 315 def rotation_in_progress? @state == :rotation_in_progress end
set_accessed_at()
click to toggle source
# File lib/filewatch/watched_file.rb, line 189 def set_accessed_at @accessed_at = Time.now.to_f end
set_listener(observer)
click to toggle source
# File lib/filewatch/watched_file.rb, line 169 def set_listener(observer) @listener = observer.listener_for(@path) end
set_maximum_read_loop()
click to toggle source
# File lib/filewatch/watched_file.rb, line 358 def set_maximum_read_loop # used to quickly fully read an open file when rotation is detected @read_loop_count = FileWatch::MAX_ITERATIONS @read_chunk_size = FileWatch::FILE_READ_SIZE @standard_loop_max_bytes = @read_loop_count * @read_chunk_size end
set_standard_read_loop()
click to toggle source
# File lib/filewatch/watched_file.rb, line 351 def set_standard_read_loop @read_loop_count = @settings.file_chunk_count @read_chunk_size = @settings.file_chunk_size # e.g. 1 * 10 bytes -> 10 or 256 * 65536 -> 1677716 or 140737488355327 * 32768 -> 4611686018427355136 @standard_loop_max_bytes = @read_loop_count * @read_chunk_size end
set_state(value)
click to toggle source
# File lib/filewatch/watched_file.rb, line 391 def set_state(value) @recent_states.shift if @recent_states.size == 8 @recent_states << @state unless @state.nil? @state = value end
shrunk?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 147 def shrunk? @size < @bytes_read end
sincedb_key()
click to toggle source
# File lib/filewatch/watched_file.rb, line 181 def sincedb_key @sdb_key_v1 end
size_changed?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 155 def size_changed? # called from closed and ignored # before the last stat was taken file should be fully read. @size != @bytes_read end
stat_sincedb_key()
click to toggle source
# File lib/filewatch/watched_file.rb, line 92 def stat_sincedb_key @stat.inode_struct end
to_s()
click to toggle source
# File lib/filewatch/watched_file.rb, line 429 def to_s inspect end
unset_listener()
click to toggle source
# File lib/filewatch/watched_file.rb, line 173 def unset_listener @listener = nil end
unwatch()
click to toggle source
# File lib/filewatch/watched_file.rb, line 303 def unwatch set_state :unwatched end
unwatched?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 339 def unwatched? @state == :unwatched end
update_bytes_read(total_bytes_read)
click to toggle source
# File lib/filewatch/watched_file.rb, line 271 def update_bytes_read(total_bytes_read) return if total_bytes_read.nil? @bytes_read = total_bytes_read update_bytes_unread @bytes_read end
watch()
click to toggle source
# File lib/filewatch/watched_file.rb, line 299 def watch set_state :watched end
watched?()
click to toggle source
# File lib/filewatch/watched_file.rb, line 335 def watched? @state == :watched end
Private Instance Methods
set_stat(stat)
click to toggle source
# File lib/filewatch/watched_file.rb, line 69 def set_stat(stat) @stat = stat @size = @stat.size @sdb_key_v1 = @stat.inode_struct end
update_bytes_unread()
click to toggle source
# File lib/filewatch/watched_file.rb, line 435 def update_bytes_unread unread = current_size - @bytes_read @bytes_unread = unread < 0 ? 0 : unread end