class Franz::Watch

Watch works in tandem with Discover to maintain a list of known files and their status. Events are generated when a file is created, destroyed, or modified (including appended, truncated, and replaced).

Attributes

deletions[R]
discoveries[R]
stats[R]
watch_events[R]
watch_interval[R]

Public Class Methods

new(opts={}) click to toggle source

Start a new Watch thread in the background.

@param [Hash] opts options for the watch @option opts [InputConfig] :input_config shared Franz configuration @option opts [Queue] :discoveries ([]) “input” queue of discovered paths @option opts [Queue] :deletions ([]) “output” queue of deleted paths @option opts [Queue] :watch_events ([]) “output” queue of file events @option opts [Fixnum] :watch_interval (1) seconds between watch rounds @option opts [Hash<Path,State>] :stats ([]) internal “stats” state @option opts [Logger] :logger (Logger.new(STDOUT)) logger to use

# File lib/franz/watch.rb, line 25
def initialize opts={}
  @ic = opts[:input_config] || raise('No input_config specified')

  @discoveries  = opts[:discoveries]  || []
  @deletions    = opts[:deletions]    || []
  @watch_events = opts[:watch_events] || []

  @play_catchup   = opts[:play_catchup?].nil? ? true : opts[:play_catchup?]
  @watch_interval = opts[:watch_interval] || 10
  @stats          = opts[:stats]          || Hash.new
  @logger         = opts[:logger]         || Logger.new(STDOUT)

  @statz = opts[:statz] || Franz::Stats.new
  @statz.create :num_watched

  # Make sure we're up-to-date by rewinding our old stats to our cursors
  if @play_catchup
    log.debug event: 'play catchup'
    stats.keys.each do |path|
      if @ic.type path
        stats[path][:size] = opts[:cursors][path] || 0
      else # we shouldn't be watching this file anymore
        stats.delete path
      end
    end
  end

  @stop = false
  @thread = Thread.new do
    until @stop
      until discoveries.empty?
        @stats[discoveries.shift] = nil
      end

      watch.each do |deleted|
        @stats.delete deleted
        deletions.push deleted
      end

      sleep watch_interval
    end
  end

  log.debug \
    event: 'watch started',
    discoveries: discoveries,
    deletions: deletions,
    watch_events: watch_events,
    watch_interval: watch_interval
end

Public Instance Methods

state() click to toggle source

Return the internal “stats” state

# File lib/franz/watch.rb, line 88
def state
  return @stats.dup
end
stop() click to toggle source

Stop the Watch thread. Effectively only once.

@return [Hash] internal “stats” state

# File lib/franz/watch.rb, line 79
def stop
  return state if @stop
  @stop = true
  @thread.kill
  log.debug event: 'watch stopped'
  return state
end

Private Instance Methods

enqueue(name, path, size=nil) click to toggle source
# File lib/franz/watch.rb, line 97
def enqueue name, path, size=nil
  log.debug \
    event: 'enqueue',
    name: name,
    file: path,
    size: size
  watch_events.push name: name, path: path, size: size
end
file_appended?(old_stat, new_stat) click to toggle source

Detect whether the file was appended.

@param old_stat [Stat] stat before some change @param new_stat [Stat] stat after some change

# File lib/franz/watch.rb, line 202
def file_appended? old_stat, new_stat
  return false if new_stat.nil?
  return new_stat[:size] > 0 if old_stat.nil?
  return new_stat[:size] > old_stat[:size]
end
file_created?(old_stat, new_stat) click to toggle source

Detect whether the file was created.

@param old_stat [Stat] stat before some change @param new_stat [Stat] stat after some change

# File lib/franz/watch.rb, line 166
def file_created? old_stat, new_stat
  return !new_stat.nil? && old_stat.nil?
end
file_deleted?(old_stat, new_stat) click to toggle source

Detect whether the file was deleted.

@param old_stat [Stat] stat before some change @param new_stat [Stat] stat after some change

# File lib/franz/watch.rb, line 174
def file_deleted? old_stat, new_stat
  return new_stat.nil? && !old_stat.nil?
end
file_replaced?(old_stat, new_stat) click to toggle source

Detect whether the file was replaced (e.g. inode changed).

@param old_stat [Stat] stat before some change @param new_stat [Stat] stat after some change

# File lib/franz/watch.rb, line 182
def file_replaced? old_stat, new_stat
  return false if new_stat.nil?
  return false if old_stat.nil?
  return inode_for(new_stat) != inode_for(old_stat)
end
file_truncated?(old_stat, new_stat) click to toggle source

Detect whether the file was truncated (e.g. rotated).

@param old_stat [Stat] stat before some change @param new_stat [Stat] stat after some change

# File lib/franz/watch.rb, line 192
def file_truncated? old_stat, new_stat
  return false if new_stat.nil?
  return false if old_stat.nil?
  return new_stat[:size] < old_stat[:size]
end
inode_for(stat) click to toggle source

Grab only the inode from a stat (or nil if the stat is nil).

@param stat [Stat] stat to inspect

# File lib/franz/watch.rb, line 157
def inode_for stat
  return nil if stat.nil?
  return stat[:inode].to_a
end
log() click to toggle source
# File lib/franz/watch.rb, line 95
def log ; @logger end
stat_for(path) click to toggle source

Perform a file stat and return a simplified version.

@param path [String] file path to examine

# File lib/franz/watch.rb, line 137
def stat_for path
  return begin
    stat = File::Stat.new(path)
    {
      inode: {
        ino: stat.ino,
        maj: stat.dev_major,
        min: stat.dev_minor
      },
      size: stat.size,
      mtime: stat.mtime
    }
  rescue Errno::ENOENT
    nil
  end
end
watch() click to toggle source
# File lib/franz/watch.rb, line 106
def watch
  log.debug event: 'watch'
  @statz.set :num_watched, stats.keys.length
  deleted = []

  stats.keys.each do |path|
    old_stat = stats[path]
    stat = stat_for path
    stats[path] = stat

    if file_deleted? old_stat, stat
      enqueue :deleted, path
      deleted << path
    end

    if file_replaced? old_stat, stat
      enqueue :replaced, path, stat[:size]
    elsif file_appended? old_stat, stat
      enqueue :appended, path, stat[:size]
    elsif file_truncated? old_stat, stat
      enqueue :truncated, path, stat[:size]
    end
  end
  return deleted
end