class Envoi::WatchFolderUtility::WatchFolder::Handler::Listen
Attributes
definition[RW]
ignored_files_map[RW]
known_path_map[RW]
last_poll_time[RW]
listener[RW]
lock[RW]
logger[RW]
min_stable_poll_count[RW]
min_stable_time[RW]
paths[RW]
poll_interval[RW]
quarantine_directory_path[RW]
Public Class Methods
new(args = { })
click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 86 def initialize(args = { }) initialize_logger(args) logger.debug { "Initializing Watch Folder Handler '#{self.class.name}'. Args: #{args}" } @definition = args[:definition] raise ArgumentError, "Definition is a required argument." unless definition @known_path_map = { } @ignored_files_map = { } @lock = Mutex.new @processing = { } process_definition(@definition) initialize_listener logger.debug ( "Watch Folder Handler '#{self.class.name}' Initialized. ") end
Public Instance Methods
add_to_ignore(file)
click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 197 def add_to_ignore(file) ignored_files_map[file.path] = file end
initialize_listener(args = { })
click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 155 def initialize_listener(args = { }) @listener = ::Listen.to(*paths, {}) do |m, a, r| if !r.empty? logger.info { "#{r.length} file(s) removed." } lock.synchronize { r.each { |f| process_deleted_path(f) } } end if !m.empty? logger.info { "#{m.length} file(s) modified." } lock.synchronize { process_add_or_modified_path(m, :modified) } end if !a.empty? logger.info { "#{a.length} file(s) added." } lock.synchronize { process_add_or_modified_path(a, :added) } end end end
initialize_logger(args = { })
click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 105 def initialize_logger(args = { }) @logger = args[:logger] ||= Logger.new(args[:log_to] || STDOUT) log_level = args[:log_level] || Logger::DEBUG if log_level @logger.level = log_level args[:logger] = @logger end @logger end
inspect()
click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 244 def inspect str = "#<#{self.class}" ids = (Thread.current[InspectKey] ||= []) if ids.include?(object_id) return str << ' ...>' end ids << object_id # begin # first = true # for k,v in @table # str << "," unless first # first = false # str << " #{k}=#{v.inspect}" # end # return str << '>' # ensure # ids.pop # end end
Also aliased as: to_s
perform_initial_inventory()
click to toggle source
Listen
doesn't perform an initial inventory by default so this method will pick up any files that are already present
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 177 def perform_initial_inventory _config = @listener.instance_variable_get(:@backend).instance_variable_get(:@adapter).config # change_config = ::Listen::Change::Config.new(_config.queue, _config.silencer) _snapshots ||= {} _config.directories.each do |dir| record = ::Listen::Record.new(dir) record.build record.instance_variable_get(:@tree).each do |rel_path, fstats| _path = File.join(dir, rel_path) process_add_or_modified_path(_path, :added) end # snapshot = ::Listen::Change.new(change_config, record) # _config.optimize_changes(snapshot) # _snapshots[dir] = snapshot end # pp _snapshots end
poll(options = { })
click to toggle source
Really this is just for debugging as polling isn't used to determine file stability
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 202 def poll(options = { }) # return if @min_stable_time should_increment_counter = options.fetch(:should_increment_counter, true) @previous_poll_time = @last_poll_time @last_poll_time = Time.now lock.synchronize do @known_path_map.each do |fp, f| next if f.ignore? unless f.exist? logger.debug { "Skipping Missing File. #{f.summary}" } next end logger.debug { "Incrementing Stable Stats: #{f.summary}" } f.last_poll_time = Time.now f[:stable_poll_count] += 1 if should_increment_counter end end end
process_add_or_modified_path(path, event_type)
click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 144 def process_add_or_modified_path(path, event_type) return path.each { |path| process_add_or_modified_path(path, event_type) } if path.is_a?(Array) logger.debug { "PATH #{event_type} '#{path}'" } # lock.synchronize { known_path_map[path] = OpenStruct.new({ type: event_type, path: path, stable_poll_count: 0, timestamp: Time.now}) } file = known_path_map[path] ||= DiscoveredPath.new({ path: path, processed: false, handler: self }) file.event_type = event_type file.event_timestamp = Time.now file.stable_poll_count = 0 file end
process_definition(watch_folder_def)
click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 115 def process_definition(watch_folder_def) # @paths = [ File.expand_path('~/watch_folders/aspera') ] @paths = watch_folder_def['paths'] || [] path = watch_folder_def['path'] @paths << path if path && !@paths.include?(path) # @quarantine_directory_path = definition['quarantine_directory_path'] || definition['quarantine_path'] @poll_interval = watch_folder_def['poll_interval'] || 15 @min_stable_poll_count = watch_folder_def['minimum_stable_poll_count'] || 3 @min_stable_time = watch_folder_def['minimum_stable_time'] if @min_stable_time.nil? && @min_stable_poll_count && @min_stable_time @min_stable_time = @poll_interval * @min_stable_poll_count end @processing_limit = watch_folder_def['processing_limit'] || 10 end
process_deleted_path(path)
click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 131 def process_deleted_path(path) return path.each { |path| process_deleted_path(path) } if path.is_a?(Array) logger.debug { "PATH #{:deleted} '#{path}'" } file = known_path_map[path] known_path_map.delete(path) ignored_files_map.delete(path) return unless file file.event_type = :deleted file.event_timestamp = Time.now file.stable_poll_count = 0 file.deleted = true end
run(options = { })
click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 232 def run(options = { }) logger.info { "Starting handler for #{paths}" } perform_initial_inventory listener.start end
run_once(options = { })
click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 228 def run_once(options = { }) perform_initial_inventory end
stable_files()
click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 224 def stable_files @known_path_map.select { |p,f| f.stable? }.values end
stop()
click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder/handler/listen.rb, line 238 def stop listener.stop if listener && listener.respond_to?(:stop) end