class FileWatch::WatchedFile

Constants

IO_BASED_STAT
PATH_BASED_STAT

Attributes

accessed_at[R]
buffer[R]
bytes_read[R]
bytes_unread[R]
file[R]
filename[R]
last_open_warning_at[RW]
listener[R]
loop_count_mode[R]
loop_count_type[R]
path[R]
pathname[R]
read_chunk_size[R]
read_loop_count[R]
recent_states[R]
stat[R]
state[R]

Public Class Methods

new(pathname, stat, settings) click to toggle source

this class represents a file that has been discovered path based stat is taken at discovery

# File lib/filewatch/watched_file.rb, line 16
def initialize(pathname, stat, settings)
  @settings = settings
  @pathname = Pathname.new(pathname) # given arg pathname might be a string or a Pathname object
  @path = @pathname.to_path.freeze
  @filename = @pathname.basename.to_s
  full_state_reset(stat)
  watch
  set_standard_read_loop
  set_accessed_at
end

Public Instance Methods

activate() click to toggle source
# File lib/filewatch/watched_file.rb, line 282
def activate
  set_state :active
end
active?() click to toggle source
# File lib/filewatch/watched_file.rb, line 319
def active?
  @state == :active
end
all_read?() click to toggle source
# File lib/filewatch/watched_file.rb, line 161
def all_read?
  @bytes_read >= @size
end
buffer_extract(data) click to toggle source
# File lib/filewatch/watched_file.rb, line 247
def buffer_extract(data)
  warning, additional = "", {}
  lines = @buffer.extract(data)
  if lines.empty?
    warning.concat("buffer_extract: a delimiter can't be found in current chunk")
    warning.concat(", maybe there are no more delimiters or the delimiter is incorrect")
    warning.concat(" or the text before the delimiter, a 'line', is very large")
    warning.concat(", if this message is logged often try increasing the `file_chunk_size` setting.")
    additional["delimiter"] = @settings.delimiter
    additional["read_position"] = @bytes_read
    additional["bytes_read_count"] = data.bytesize
    additional["last_known_file_size"] = last_stat_size
    additional["file_path"] = @path
  end
  BufferExtractResult.new(lines, warning, additional)
end
close() click to toggle source
# File lib/filewatch/watched_file.rb, line 295
def close
  set_state :closed
end
closed?() click to toggle source
# File lib/filewatch/watched_file.rb, line 331
def closed?
  @state == :closed
end
compressed?() click to toggle source
# File lib/filewatch/watched_file.rb, line 197
def compressed?
  @path.end_with?('.gz','.gzip')
end
current_size() click to toggle source
# File lib/filewatch/watched_file.rb, line 143
def current_size
  @size
end
delay_delete() click to toggle source
# File lib/filewatch/watched_file.rb, line 307
def delay_delete
  set_state :delayed_delete
end
delayed_delete?() click to toggle source
# File lib/filewatch/watched_file.rb, line 323
def delayed_delete?
  @state == :delayed_delete
end
details() click to toggle source
# File lib/filewatch/watched_file.rb, line 418
def details
  detail = "@filename='#{@filename}', @state=#{@state.inspect}, @recent_states=#{@recent_states.inspect}, "
  detail.concat("@bytes_read=#{@bytes_read}, @bytes_unread=#{@bytes_unread}, current_size=#{current_size}, ")
  detail.concat("last_stat_size=#{last_stat_size}, file_open?=#{file_open?}, @initial=#{@initial}")
  "<FileWatch::WatchedFile: #{detail}, sincedb_key='#{sincedb_key}'>"
end
expiry_close_enabled?() click to toggle source
# File lib/filewatch/watched_file.rb, line 343
def expiry_close_enabled?
  !@settings.close_older.nil?
end
expiry_ignore_enabled?() click to toggle source
# File lib/filewatch/watched_file.rb, line 347
def expiry_ignore_enabled?
  !@settings.ignore_older.nil?
end
file_add_opened(rubyfile) click to toggle source
# File lib/filewatch/watched_file.rb, line 212
def file_add_opened(rubyfile)
  @file = rubyfile
  @buffer = BufferedTokenizer.new(@settings.delimiter) if @buffer.nil?
end
file_at_path_found_again() click to toggle source
# File lib/filewatch/watched_file.rb, line 165
def file_at_path_found_again
  restore_previous_state
end
file_can_close?() click to toggle source
# File lib/filewatch/watched_file.rb, line 413
def file_can_close?
  return false unless expiry_close_enabled?
  (Time.now.to_f - @accessed_at) > @settings.close_older
end
file_closable?() click to toggle source
# File lib/filewatch/watched_file.rb, line 401
def file_closable?
  file_can_close? && all_read?
end
file_close() click to toggle source
# File lib/filewatch/watched_file.rb, line 217
def file_close
  return if @file.nil? || @file.closed?
  @file.close
  @file = nil
end
file_ignorable?() click to toggle source
# File lib/filewatch/watched_file.rb, line 405
def file_ignorable?
  return false unless expiry_ignore_enabled?
  # (Time.now - stat.mtime) <- in jruby, this does int and float
  # conversions before the subtraction and returns a float.
  # so use all floats upfront
  (Time.now.to_f - modified_at) > @settings.ignore_older
end
file_open?() click to toggle source
# File lib/filewatch/watched_file.rb, line 232
def file_open?
  !@file.nil? && !@file.closed?
end
file_read(amount = nil) click to toggle source
# File lib/filewatch/watched_file.rb, line 227
def file_read(amount = nil)
  set_accessed_at
  @file.sysread(amount || @read_chunk_size)
end
file_seek(amount, whence = IO::SEEK_SET) click to toggle source
# File lib/filewatch/watched_file.rb, line 223
def file_seek(amount, whence = IO::SEEK_SET)
  @file.sysseek(amount, whence)
end
full_state_reset(this_stat = nil) click to toggle source
# File lib/filewatch/watched_file.rb, line 27
def full_state_reset(this_stat = nil)
  if this_stat.nil?
    begin
      this_stat = PathStatClass.new(pathname)
    rescue Errno::ENOENT
      delay_delete
      return
    end
  end
  @bytes_read = 0 # tracks bytes read from the open file or initialized from a matched sincedb_value off disk.
  @bytes_unread = 0 # tracks bytes not yet read from the open file. So we can warn on shrink when unread bytes are seen.
  file_close
  set_stat(this_stat)
  @listener = nil
  @last_open_warning_at = nil
  # initial as true means we have not associated this watched_file with a previous sincedb value yet.
  # and we should read from the beginning if necessary
  @initial = true
  @recent_states = [] # keep last 8 states, managed in set_state
  # the prepare_inode method is sourced from the mixed module above
  watch if active? || @state.nil?
end
grown?() click to toggle source
# File lib/filewatch/watched_file.rb, line 151
def grown?
  @size > @bytes_read
end
has_listener?() click to toggle source
# File lib/filewatch/watched_file.rb, line 177
def has_listener?
  !@listener.nil?
end
ignore() click to toggle source
# File lib/filewatch/watched_file.rb, line 286
def ignore
  set_state :ignored
end
ignore_as_unread() click to toggle source
# File lib/filewatch/watched_file.rb, line 290
def ignore_as_unread
  ignore
  @bytes_read = @size
end
ignored?() click to toggle source
# File lib/filewatch/watched_file.rb, line 327
def ignored?
  @state == :ignored
end
increment_bytes_read(delta) click to toggle source
# File lib/filewatch/watched_file.rb, line 264
def increment_bytes_read(delta)
  return if delta.nil?
  @bytes_read += delta
  update_bytes_unread
  @bytes_read
end
initial?() click to toggle source
# File lib/filewatch/watched_file.rb, line 193
def initial?
  @initial
end
initial_completed() click to toggle source
# File lib/filewatch/watched_file.rb, line 185
def initial_completed
  @initial = false
end
inspect() click to toggle source
# File lib/filewatch/watched_file.rb, line 425
def inspect
  "<FileWatch::WatchedFile: @filename='#{@filename}', @state=#{@state.inspect}, current_size=#{current_size}, sincedb_key='#{sincedb_key}'>"
end
last_stat_size() click to toggle source
# File lib/filewatch/watched_file.rb, line 139
def last_stat_size
  @stat.size
end
loop_control_adjusted_for_stat_size() click to toggle source
# File lib/filewatch/watched_file.rb, line 365
def loop_control_adjusted_for_stat_size
  more = false
  to_read = current_size - @bytes_read
  return LoopControlResult.new(0, 0, more) if to_read < 1
  return LoopControlResult.new(1, to_read, more) if to_read < @read_chunk_size
  # set as if to_read is greater than or equal to max_bytes
  # use the ones from settings and don't indicate more
  count = @read_loop_count
  if to_read < @standard_loop_max_bytes
    # if the defaults are used then this branch will be taken
    # e.g. to_read is 100 and max_bytes is 4 * 30 -> 120
    # will overrun and trigger EOF, build less iterations
    # will generate 3 * 30 -> 90 this time and we indicate more
    # a 2GB file in read mode will get one loop of 64666 x 32768 (2119006656 / 32768)
    # and a second loop with 1 x 31168
    count = to_read / @read_chunk_size
    more = true
  end
  LoopControlResult.new(count, @read_chunk_size, more)
end
modified_at(update = false) click to toggle source
# File lib/filewatch/watched_file.rb, line 115
def modified_at(update = false)
  if update || @modified_at.nil?
    @modified_at = @stat.modified_at
  else
    @modified_at
  end
end
modified_at_changed?() click to toggle source

@return whether modified_at changed since it was last read @see restat!

# File lib/filewatch/watched_file.rb, line 125
def modified_at_changed?
  modified_at != @stat.modified_at
end
open() click to toggle source
# File lib/filewatch/watched_file.rb, line 208
def open
  file_add_opened(FileOpener.open(@path))
end
position_for_new_sincedb_value() click to toggle source
# File lib/filewatch/watched_file.rb, line 129
def position_for_new_sincedb_value
  if @initial
    # this file was found in first discovery
    @settings.start_new_files_at == :beginning ? 0 : last_stat_size
  else
    # always start at the beginning if found after first discovery
    0
  end
end
read_extract_lines(amount) click to toggle source
# File lib/filewatch/watched_file.rb, line 240
def read_extract_lines(amount)
  data = file_read(amount)
  result = buffer_extract(data)
  increment_bytes_read(data.bytesize)
  result
end
recent_state_history() click to toggle source
# File lib/filewatch/watched_file.rb, line 397
def recent_state_history
  @recent_states + Array(@state)
end
reopen() click to toggle source
# File lib/filewatch/watched_file.rb, line 201
def reopen
  if file_open?
    file_close
    open
  end
end
reset_buffer() click to toggle source
# File lib/filewatch/watched_file.rb, line 236
def reset_buffer
  @buffer.flush
end
reset_bytes_unread() click to toggle source
# File lib/filewatch/watched_file.rb, line 386
def reset_bytes_unread
  # called from shrink
  @bytes_unread = 0
end
restat!() click to toggle source

@return true if the file was modified since last stat

# File lib/filewatch/watched_file.rb, line 101
def restat!
  modified_at # to always be able to detect changes
  @stat.restat
  if rotation_detected?
    # switch to new state now
    rotation_in_progress
    return true
  else
    @size = @stat.size
    update_bytes_unread
    modified_at_changed?
  end
end
restore_previous_state() click to toggle source
# File lib/filewatch/watched_file.rb, line 311
def restore_previous_state
  set_state @recent_states.pop
end
rotate_as_file(bytes_read = 0) click to toggle source
# File lib/filewatch/watched_file.rb, line 76
def rotate_as_file(bytes_read = 0)
  # rotation, when a sincedb record exists for new inode, but no watched file to rotate from
  # probably caused by a deletion detected in the middle of the rename cascade
  # RARE due to delayed_delete - there would have to be a large time span between the renames.
  @bytes_read = bytes_read # tracks bytes read from the open file or initialized from a matched sincedb_value off disk.
  @bytes_unread = 0 # tracks bytes not yet read from the open file. So we can warn on shrink when unread bytes are seen.
  @last_open_warning_at = nil
  # initial as true means we have not associated this watched_file with a previous sincedb value yet.
  # and we should read from the beginning if necessary
  @initial = false
  @recent_states = [] # keep last 8 states, managed in set_state
  set_stat(PathStatClass.new(pathname))
  reopen
  watch
end
rotate_from(other) click to toggle source
# File lib/filewatch/watched_file.rb, line 50
def rotate_from(other)
  # move all state from other to this one
  set_standard_read_loop
  file_close
  @bytes_read = other.bytes_read
  @bytes_unread = other.bytes_unread
  @listener = nil
  @initial = false
  @recent_states = other.recent_states
  @accessed_at = other.accessed_at
  if !other.delayed_delete?
    # we don't know if a file exists at the other.path yet
    # so no reset
    other.full_state_reset
  end
  set_stat PathStatClass.new(pathname)
  ignore
end
rotation_detected?() click to toggle source
# File lib/filewatch/watched_file.rb, line 96
def rotation_detected?
  stat_sincedb_key != sincedb_key
end
rotation_in_progress() click to toggle source
# File lib/filewatch/watched_file.rb, line 278
def rotation_in_progress
  set_state :rotation_in_progress
end
rotation_in_progress?() click to toggle source
# File lib/filewatch/watched_file.rb, line 315
def rotation_in_progress?
  @state == :rotation_in_progress
end
set_accessed_at() click to toggle source
# File lib/filewatch/watched_file.rb, line 189
def set_accessed_at
  @accessed_at = Time.now.to_f
end
set_listener(observer) click to toggle source
# File lib/filewatch/watched_file.rb, line 169
def set_listener(observer)
  @listener = observer.listener_for(@path)
end
set_maximum_read_loop() click to toggle source
# File lib/filewatch/watched_file.rb, line 358
def set_maximum_read_loop
  # used to quickly fully read an open file when rotation is detected
  @read_loop_count = FileWatch::MAX_ITERATIONS
  @read_chunk_size = FileWatch::FILE_READ_SIZE
  @standard_loop_max_bytes = @read_loop_count * @read_chunk_size
end
set_standard_read_loop() click to toggle source
# File lib/filewatch/watched_file.rb, line 351
def set_standard_read_loop
  @read_loop_count = @settings.file_chunk_count
  @read_chunk_size = @settings.file_chunk_size
  # e.g. 1 * 10 bytes -> 10 or 256 * 65536 -> 1677716 or 140737488355327 * 32768 -> 4611686018427355136
  @standard_loop_max_bytes = @read_loop_count * @read_chunk_size
end
set_state(value) click to toggle source
# File lib/filewatch/watched_file.rb, line 391
def set_state(value)
  @recent_states.shift if @recent_states.size == 8
  @recent_states << @state unless @state.nil?
  @state = value
end
shrunk?() click to toggle source
# File lib/filewatch/watched_file.rb, line 147
def shrunk?
  @size < @bytes_read
end
sincedb_key() click to toggle source
# File lib/filewatch/watched_file.rb, line 181
def sincedb_key
  @sdb_key_v1
end
size_changed?() click to toggle source
# File lib/filewatch/watched_file.rb, line 155
def size_changed?
  # called from closed and ignored
  # before the last stat was taken file should be fully read.
  @size != @bytes_read
end
stat_sincedb_key() click to toggle source
# File lib/filewatch/watched_file.rb, line 92
def stat_sincedb_key
  @stat.inode_struct
end
to_s() click to toggle source
# File lib/filewatch/watched_file.rb, line 429
def to_s
  inspect
end
unset_listener() click to toggle source
# File lib/filewatch/watched_file.rb, line 173
def unset_listener
  @listener = nil
end
unwatch() click to toggle source
# File lib/filewatch/watched_file.rb, line 303
def unwatch
  set_state :unwatched
end
unwatched?() click to toggle source
# File lib/filewatch/watched_file.rb, line 339
def unwatched?
  @state == :unwatched
end
update_bytes_read(total_bytes_read) click to toggle source
# File lib/filewatch/watched_file.rb, line 271
def update_bytes_read(total_bytes_read)
  return if total_bytes_read.nil?
  @bytes_read = total_bytes_read
  update_bytes_unread
  @bytes_read
end
watch() click to toggle source
# File lib/filewatch/watched_file.rb, line 299
def watch
  set_state :watched
end
watched?() click to toggle source
# File lib/filewatch/watched_file.rb, line 335
def watched?
  @state == :watched
end

Private Instance Methods

set_stat(stat) click to toggle source
# File lib/filewatch/watched_file.rb, line 69
def set_stat(stat)
  @stat = stat
  @size = @stat.size
  @sdb_key_v1 = @stat.inode_struct
end
update_bytes_unread() click to toggle source
# File lib/filewatch/watched_file.rb, line 435
def update_bytes_unread
  unread = current_size - @bytes_read
  @bytes_unread = unread < 0 ? 0 : unread
end