class FileWatch::SincedbCollection

this KV collection has a watched_file storage_key (an InodeStruct) as the key and a SincedbValue as the value. the SincedbValues are built by reading the sincedb file.

Attributes

path[R]
serializer[W]

Public Class Methods

new(settings) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 14
def initialize(settings)
  @settings = settings
  @sincedb_last_write = 0
  @sincedb = {}
  @serializer = SincedbRecordSerializer.new(@settings.sincedb_expiry_duration)
  @path = Pathname.new(@settings.sincedb_path)
  @write_method = LogStash::Environment.windows? || @path.chardev? || @path.blockdev? ? method(:non_atomic_write) : method(:atomic_write)
  @full_path = @path.to_path
  FileUtils.touch(@full_path)
  @write_requested = false
end

Public Instance Methods

associate(watched_file) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 63
def associate(watched_file)
  logger.trace? && logger.trace("associate: finding", :path => watched_file.path, :inode => watched_file.sincedb_key.inode)
  sincedb_value = find(watched_file)
  if sincedb_value.nil?
    # sincedb has no record of this inode
    # and due to the window handling of many files
    # this file may not be opened in this session.
    # a new value will be added when the file is opened
    logger.trace("associate: unmatched", :filename => watched_file.filename)
    return true
  end
  logger.trace? && logger.trace("associate: found sincedb record", :filename => watched_file.filename,
                                :sincedb_key => watched_file.sincedb_key, :sincedb_value => sincedb_value)
  if sincedb_value.watched_file.nil? # not associated
    if sincedb_value.path_in_sincedb.nil?
      handle_association(sincedb_value, watched_file)
      logger.trace? && logger.trace("associate: inode matched but no path in sincedb", :filename => watched_file.filename)
      return true
    end
    if sincedb_value.path_in_sincedb == watched_file.path
      # the path on disk is the same as discovered path and the inode is the same.
      handle_association(sincedb_value, watched_file)
      logger.trace? && logger.trace("associate: inode and path matched", :filename => watched_file.filename)
      return true
    end
    # the path on disk is different from discovered unassociated path but they have the same key (inode)
    # treat as a new file, a new value will be added when the file is opened
    sincedb_value.clear_watched_file
    delete(watched_file.sincedb_key)
    logger.trace? && logger.trace("associate: matched but allocated to another", :filename => watched_file.filename)
    return true
  end
  if sincedb_value.watched_file.equal?(watched_file) # pointer equals
    logger.trace? && logger.trace("associate: already associated", :filename => watched_file.filename)
    return true
  end
  # sincedb_value.watched_file is not this discovered watched_file but they have the same key (inode)
  # this means that the filename path was changed during this session.
  # renamed file can be discovered...
  #   before the original is detected as deleted: state is `active`
  #   after the original is detected as deleted but before it is actually deleted: state is `delayed_delete`
  #   after the original is deleted
  # are not yet in the delete phase, let this play out
  existing_watched_file = sincedb_value.watched_file
  logger.trace? && logger.trace("associate: found sincedb_value has a watched_file - this is a rename",
                                :this_watched_file => watched_file.details, :existing_watched_file => existing_watched_file.details)
  watched_file.rotation_in_progress
  true
end
clear() click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 167
def clear
  @sincedb.clear
end
clear_watched_file(key) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 159
def clear_watched_file(key)
  @sincedb[key].clear_watched_file
end
delete(key) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 130
def delete(key)
  @sincedb.delete(key)
end
find(watched_file) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 113
def find(watched_file)
  get(watched_file.sincedb_key)
end
flush_at_interval() click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 180
def flush_at_interval
  now = Time.now
  delta = now.to_i - @sincedb_last_write
  if delta >= @settings.sincedb_write_interval
    logger.debug("writing sincedb (delta since last write = #{delta})")
    sincedb_write(now)
  end
end
get(key) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 121
def get(key)
  @sincedb[key]
end
increment(key, amount) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 142
def increment(key, amount)
  @sincedb[key].increment_position(amount)
end
keys() click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 171
def keys
  @sincedb.keys
end
last_read(key) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 134
def last_read(key)
  @sincedb[key].position
end
member?(key) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 117
def member?(key)
  @sincedb.member?(key)
end
open() click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 46
def open
  @time_sdb_opened = Time.now.to_f
  begin
    path.open do |file|
      logger.debug("open: reading from #{path}")
      @serializer.deserialize(file) do |key, value|
        logger.trace? && logger.trace("open: importing #{key.inspect} => #{value.inspect}")
        set_key_value(key, value)
      end
    end
    logger.trace("open: count of keys read: #{@sincedb.keys.size}")
  rescue => e
    #No existing sincedb to load
    logger.debug("open: error opening #{path}", :exception => e.class, :message => e.message)
  end
end
reading_completed(key) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 163
def reading_completed(key)
  @sincedb[key].reading_completed
end
request_disk_flush() click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 30
def request_disk_flush
  @write_requested = true
  flush_at_interval
end
rewind(key) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 138
def rewind(key)
  @sincedb[key].update_position(0)
end
set(key, value) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 125
def set(key, value)
  @sincedb[key] = value
  value
end
set_watched_file(key, watched_file) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 146
def set_watched_file(key, watched_file)
  @sincedb[key].set_watched_file(watched_file)
end
store_last_read(key, pos) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 155
def store_last_read(key, pos)
  @sincedb[key].update_position(pos)
end
watched_file_deleted(watched_file) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 150
def watched_file_deleted(watched_file)
  value = @sincedb[watched_file.sincedb_key]
  value.unset_watched_file if value
end
watched_file_unset?(key) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 175
def watched_file_unset?(key)
  return false unless member?(key)
  get(key).watched_file.nil?
end
write(reason=nil) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 41
def write(reason=nil)
  logger.trace("caller requested sincedb write (#{reason})")
  sincedb_write
end
write_if_requested() click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 35
def write_if_requested
  if write_requested?
    flush_at_interval
  end
end
write_requested?() click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 26
def write_requested?
  @write_requested
end

Private Instance Methods

atomic_write(time) click to toggle source

@return expired keys

# File lib/filewatch/sincedb_collection.rb, line 227
def atomic_write(time)
  logger.trace? && logger.trace("non_atomic_write: ", :time => time)
  begin
    FileHelper.write_atomically(@full_path) do |io|
      @serializer.serialize(@sincedb, io, time.to_f)
    end
  rescue Errno::EPERM, Errno::EACCES => e
    logger.warn("sincedb_write: unable to write atomically due to permissions error, falling back to non-atomic write: #{path} error:", :exception => e.class, :message => e.message)
    @write_method = method(:non_atomic_write)
    non_atomic_write(time)
  rescue => e
    logger.warn("sincedb_write: unable to write atomically, attempting non-atomic write: #{path} error:", :exception => e.class, :message => e.message)
    non_atomic_write(time)
  end
end
handle_association(sincedb_value, watched_file) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 191
def handle_association(sincedb_value, watched_file)
  watched_file.update_bytes_read(sincedb_value.position)
  sincedb_value.set_watched_file(watched_file)
  watched_file.initial_completed
  if watched_file.all_read?
    watched_file.ignore
    logger.trace? && logger.trace("handle_association fully read, ignoring", :watched_file => watched_file.details, :sincedb_value => sincedb_value)
  end
end
non_atomic_write(time) click to toggle source

@return expired keys

# File lib/filewatch/sincedb_collection.rb, line 244
def non_atomic_write(time)
  logger.trace? && logger.trace("non_atomic_write: ", :time => time)
  File.open(@full_path, "w+") do |io|
    @serializer.serialize(@sincedb, io, time.to_f)
  end
end
set_key_value(key, value) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 201
def set_key_value(key, value)
  if @time_sdb_opened < value.last_changed_at_expires(@settings.sincedb_expiry_duration)
    set(key, value)
  else
    logger.debug("set_key_value: record has expired, skipping: #{key.inspect} => #{value.inspect}")
  end
end
sincedb_write(time = Time.now) click to toggle source
# File lib/filewatch/sincedb_collection.rb, line 209
def sincedb_write(time = Time.now)
  logger.trace? && logger.trace("sincedb_write: #{path} (time = #{time})")
  begin
    expired_keys = @write_method.call(time)
    expired_keys.each do |key|
      @sincedb[key].unset_watched_file
      delete(key)
      logger.trace? && logger.trace("sincedb_write: cleaned", :key => key)
    end
    @sincedb_last_write = time.to_i
    @write_requested = false
  rescue Errno::EACCES => e
    # no file handles free perhaps - maybe it will work next time
    logger.debug("sincedb_write: #{path} error:", :exception => e.class, :message => e.message)
  end
end