class Envoi::WatchFolderUtility::WatchFolder::Handler::Listen

Attributes

definition[RW]
ignored_files_map[RW]
known_path_map[RW]
last_poll_time[RW]
listener[RW]
lock[RW]
logger[RW]
min_stable_poll_count[RW]
min_stable_time[RW]
paths[RW]
poll_interval[RW]
quarantine_directory_path[RW]

Public Class Methods

new(args = { }) click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 86
def initialize(args = { })
  initialize_logger(args)
  logger.debug { "Initializing Watch Folder Handler '#{self.class.name}'. Args: #{args}" }

  @definition = args[:definition]
  raise ArgumentError, "Definition is a required argument." unless definition

  @known_path_map = { }
  @ignored_files_map = { }

  @lock = Mutex.new

  @processing = { }

  process_definition(@definition)
  initialize_listener
  logger.debug ( "Watch Folder Handler '#{self.class.name}' Initialized. ")
end

Public Instance Methods

add_to_ignore(file) click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 197
def add_to_ignore(file)
  ignored_files_map[file.path] = file
end
initialize_listener(args = { }) click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 155
def initialize_listener(args = { })
  @listener = ::Listen.to(*paths, {}) do |m, a, r|
    if !r.empty?
      logger.info { "#{r.length} file(s) removed." }
      lock.synchronize { r.each { |f| process_deleted_path(f) } }
    end

    if !m.empty?
      logger.info { "#{m.length} file(s) modified." }
      lock.synchronize { process_add_or_modified_path(m, :modified) }
    end

    if !a.empty?
      logger.info { "#{a.length} file(s) added." }
      lock.synchronize { process_add_or_modified_path(a, :added) }
    end

  end
end
initialize_logger(args = { }) click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 105
def initialize_logger(args = { })
  @logger = args[:logger] ||= Logger.new(args[:log_to] || STDOUT)
  log_level = args[:log_level] || Logger::DEBUG
  if log_level
    @logger.level = log_level
    args[:logger] = @logger
  end
  @logger
end
inspect() click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 244
def inspect
  str = "#<#{self.class}"

  ids = (Thread.current[InspectKey] ||= [])
  if ids.include?(object_id)
    return str << ' ...>'
  end

  ids << object_id
  # begin
  #   first = true
  #   for k,v in @table
  #     str << "," unless first
  #     first = false
  #     str << " #{k}=#{v.inspect}"
  #   end
  #   return str << '>'
  # ensure
  #   ids.pop
  # end
end
Also aliased as: to_s
perform_initial_inventory() click to toggle source

Listen doesn't perform an initial inventory by default so this method will pick up any files that are already present

# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 177
def perform_initial_inventory
  _config = @listener.instance_variable_get(:@backend).instance_variable_get(:@adapter).config
  # change_config = ::Listen::Change::Config.new(_config.queue, _config.silencer)

  _snapshots ||= {}
  _config.directories.each do |dir|
    record = ::Listen::Record.new(dir)
    record.build
    record.instance_variable_get(:@tree).each do |rel_path, fstats|
      _path = File.join(dir, rel_path)
      process_add_or_modified_path(_path, :added)
    end
    # snapshot = ::Listen::Change.new(change_config, record)
    # _config.optimize_changes(snapshot)
    # _snapshots[dir] = snapshot
  end

  # pp _snapshots
end
poll(options = { }) click to toggle source

Really this is just for debugging as polling isn't used to determine file stability

# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 202
def poll(options = { })
  # return if @min_stable_time
  should_increment_counter = options.fetch(:should_increment_counter, true)

  @previous_poll_time = @last_poll_time
  @last_poll_time = Time.now

  lock.synchronize do
    @known_path_map.each do |fp, f|
      next if f.ignore?
      unless f.exist?
        logger.debug { "Skipping Missing File. #{f.summary}" }
        next
      end

      logger.debug { "Incrementing Stable Stats: #{f.summary}" }
      f.last_poll_time = Time.now
      f[:stable_poll_count] += 1 if should_increment_counter
    end
  end
end
process_add_or_modified_path(path, event_type) click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 144
def process_add_or_modified_path(path, event_type)
  return path.each { |path| process_add_or_modified_path(path, event_type) } if path.is_a?(Array)
  logger.debug { "PATH #{event_type} '#{path}'" }
  # lock.synchronize { known_path_map[path] = OpenStruct.new({ type: event_type, path: path, stable_poll_count: 0, timestamp: Time.now}) }
  file = known_path_map[path] ||= DiscoveredPath.new({ path: path, processed: false, handler: self })
  file.event_type = event_type
  file.event_timestamp = Time.now
  file.stable_poll_count = 0
  file
end
process_definition(watch_folder_def) click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 115
def process_definition(watch_folder_def)
  # @paths = [ File.expand_path('~/watch_folders/aspera') ]
  @paths = watch_folder_def['paths'] || []
  path = watch_folder_def['path']
  @paths << path if path && !@paths.include?(path)

  # @quarantine_directory_path = definition['quarantine_directory_path'] || definition['quarantine_path']
  @poll_interval = watch_folder_def['poll_interval'] || 15
  @min_stable_poll_count = watch_folder_def['minimum_stable_poll_count'] || 3
  @min_stable_time = watch_folder_def['minimum_stable_time']
  if @min_stable_time.nil? && @min_stable_poll_count && @min_stable_time
    @min_stable_time = @poll_interval * @min_stable_poll_count
  end
  @processing_limit = watch_folder_def['processing_limit'] || 10
end
process_deleted_path(path) click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 131
def process_deleted_path(path)
  return path.each { |path| process_deleted_path(path) } if path.is_a?(Array)
  logger.debug { "PATH #{:deleted} '#{path}'" }
  file = known_path_map[path]
  known_path_map.delete(path)
  ignored_files_map.delete(path)
  return unless file
  file.event_type = :deleted
  file.event_timestamp = Time.now
  file.stable_poll_count = 0
  file.deleted = true
end
run(options = { }) click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 232
def run(options = { })
  logger.info { "Starting handler for #{paths}" }
  perform_initial_inventory
  listener.start
end
run_once(options = { }) click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 228
def run_once(options = { })
  perform_initial_inventory
end
stable_files() click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 224
def stable_files
  @known_path_map.select { |p,f| f.stable? }.values
end
stop() click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 238
def stop
  listener.stop if listener && listener.respond_to?(:stop)
end
to_s()
Alias for: inspect