class Envoi::WatchFolderUtility::WatchFolder
Constants
- DEFAULT_POLL_INTERVAL
- DEFAULT_PROCESSOR_COUNT_LIMIT
Attributes
Public Class Methods
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 23 def initialize(args = { }) initialize_logger(args) @definition = args[:definition].dup logger.debug { "Initializing Watch Folder. #{Object.__id__}" } @ignored_file_paths = [ ] @ignored_file_paths_lock = Mutex.new @threaded = args.fetch(:threaded, true) @default_maximum_active_processors = DEFAULT_PROCESSOR_COUNT_LIMIT @processors = { } # @processors = Hash.new { |h, k| h[k] = {} } @default_handler_class = Envoi::WatchFolderUtility::WatchFolder::Handler::Listen process_watch_folder_def initialize_handler @default_agent = args[:default_agent] @default_agent_class = args[:default_agent_class] || @default_agent.class @last_poll_time = nil process_agent_defs logger.debug { "Watch Folder Initialized. #{Object.__id__}" } end
Public Instance Methods
@param [Object] file
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 96 def add_file_to_ignored_file_paths(file) logger.debug { "Adding File to Ignore Cache: '#{file.path}'" } @ignored_file_paths_lock.synchronize do @ignored_file_paths << file.path file.ignore if file.respond_to?(:ignore) end end
Used to compare file to patterns
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 105 def find_in_patterns(patterns, file) patterns.find do |pattern| matched = pattern.is_a?(Regexp) ? pattern.match(file.path) : File.fnmatch(pattern, file.path) logger.debug { "#{pattern} #{matched ? 'matched' : "didn't match"} #{file.path}" } matched end end
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 113 def ignored_file_paths handler.ignored_files_map.keys end
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 75 def initialize_agent(args = {}) @agent ||= begin logger.debug { "Initializing Agent. #{@default_agent_class} #{args}" } _agent = @default_agent_class.new(config: args, logger: logger, default_preserve_file_path: false) logger.debug { "Agent Instance created." } _agent end end
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 84 def initialize_handler(watch_folder_def = @definition) args_out = {} args_out[:logger] ||= logger.dup args_out[:definition] = watch_folder_def handler_class = @default_handler_class logger.debug { "Creating Watch Folder Handler Instance. #{handler_class.name}" } @handler = handler_class.new(args_out) logger.debug { "Watch Folder Handler Instance Created. #{handler_class.name}" } end
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 53 def initialize_logger(args = {}) @logger = args[:logger] # @logger = args[:logger] ||= begin # _log_to = MultiIO.new(STDOUT) # log_to = args[:log_to] # log_age = args[:log_age] || 'daily' # _log_to.add_target(File.open(log_to, 'a')) if log_to # _logger = Logger.new(_log_to, log_age) # end # # log_level = args[:log_level] ||= Logger::INFO # if log_level # if log_level.is_a?(String) # log_level.downcase! # _log_level = %w(fatal error warn info debug).find { |v| v == log_level } # log_level = _log_level ? Logger::Severity.const_get(_log_level.to_sym.upcase) : Logger::INFO # end # @logger.level = log_level # end @logger end
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 138 def name @name ||= definition['name'] || definition['paths'] || definition['path'] || Object.__id__ end
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 117 def poll @previous_poll_time = @last_poll_time @last_poll_time = Time.now handler.poll if handler.respond_to?(:poll) end
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 124 def poll_interval_elapsed? !last_poll_time || (Time.now - last_poll_time) >= poll_interval end
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 128 def process_agent_def(agent_def) if @default_agent _agent_def_storages = agent_def['storages'] || {} if _agent_def_storages agent_def['storages'] = default_agent.agent_config_storages.merge(_agent_def_storages) end end initialize_agent(agent_def) end
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 142 def process_agent_defs agent_defs = definition['agents'] if agent_defs if agent_defs.is_a?(Hash) _agents = agent_defs.map do |name, agent_def| agent_def['name'] ||= name # agent_def['type'] ||= name process_agent_def(agent_def) end elsif agent_defs.is_a?(Array) _agents = agent_defs.map { |agent_def| process_agent_def(agent_def) } else end end @agent ||= @default_agent end
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 357 def process_stable_file(file) file.processing = true file_name = file.name || file.path logger.debug { "Processing File '#{file_name}'" } storage_id = definition['upload_to_storage_id'] unless storage_id logger.warn { "Skipping processing of file because of missing storage ID." } return { success: false, message: 'Missing storage ID.' } end quarantine_directory_path = definition['quarantine_directory_path'] completed_directory_path = definition['completed_directory_path'] watch_folder_upload_args = definition['upload_args'] # full_file_path = File.join(watch_folder.path, file.path) full_file_path = file.path upload_args = { file_path: full_file_path, storage_id: storage_id } upload_args.merge!(watch_folder_upload_args) if watch_folder_upload_args.is_a?(Hash) logger.debug { "Executing Upload. #{upload_args}" } _response = agent.upload(upload_args) _response = { success: _response } if _response == true || _response == false if _response[:success] if completed_directory_path if Dir.exist?(completed_directory_path) logger.debug { "Moving '#{full_file_path}' to completed directory path '#{completed_directory_path}'" } FileUtils.mv full_file_path, completed_directory_path else logger.warn { "Completed directory path not found: '#{completed_directory_path}'" } add_file_to_ignored_file_paths(file) end else FileUtils.rm full_file_path end else if quarantine_directory_path && Dir.exist?(quarantine_directory_path) logger.warn { "Moving '#{full_file_path}' to quarantine directory path '#{quarantine_directory_path}'" } FileUtils.mv full_file_path, quarantine_directory_path else logger.warn { "Adding '#{full_file_path}' to the temporary ignore list." } add_file_to_ignored_file_paths(file) end end file.processed = true _response rescue => e file.exception = e raise e ensure file.processing = false end
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 419 def process_stable_files maximum_active_processors = definition['maximum_active_processors'] includes = definition['includes'] excludes = definition['excludes'] stable_files.each do |file| file.watch_folder ||= self next if file.respond_to?(:ignore?) ? file.ignore? : ignored_file_paths.include?(file.path) next if file.processing || file.processed if includes && !includes.empty? should_include = find_in_patterns(includes, file) unless should_include add_file_to_ignored_file_paths(file) next end end should_exclude = find_in_patterns(excludes, file) if should_exclude add_file_to_ignored_file_paths(file) next end if @threaded processors.keep_if { |k, v| k.processing } if processors.length >= maximum_active_processors logger.debug { "Maximum number of active processors reached for watch folder. #{wf.name || wf.paths}" } break end t = Thread.new(file) do |file| begin process_stable_file(file) rescue => e logger.error { "Exception '#{e.message}' in thread for `#{name}` `#{file.path}`. " } raise e ensure file.processing = false rescue nil end end # t.join processors[file] = t if file.processing else process_stable_file(file) end end end
@param [Hash] watch_folder_def @option watch_folder_def [String] path @option watch_folder_def [String] upload_to_storage_id @option watch_folder_def [String] name (path) @option watch_folder_def [Array<String>] paths ([path]) @option watch_folder_def [String] exclude ('*/.') @option watch_folder_def [Array<string>] excludes ([exclude]) @option watch_folder_def [String] include @option watch_folder_def [Array<String>] includes ([include]) @option watch_folder_def [String] quarantine_directory_path @option watch_folder_def [String] completed_directory_path @option watch_folder_def [Hash] import_args ({}) @option watch_folder_def [Hash] import_options ({}) @option watch_folder_def [Integer|False] maximum_active_processors (@default_maximum_active_processors) @option watch_folder_def [Hash] logging
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 176 def process_watch_folder_def(watch_folder_def = @definition) logger.debug { "Initializing Watch Folder #{watch_folder_def.inspect}" } logger.debug { "Initializing parameter 'paths'." } name = watch_folder_def['name'] path = watch_folder_def['path'] paths = watch_folder_def['paths'] ||= [] paths = [ paths ] if paths.is_a?(String) paths.concat [*path] if path paths.map! { |p| File.expand_path(p) } if paths.empty? name_as_path = File.expand_path(name) paths.concat name_as_path if Dir.exist?(name_as_path) end paths.uniq! watch_folder_def['paths'] = paths # watch_folder_def['path'] ||= paths.first if paths.length == 1 watch_folder_def.delete('path') if paths.empty? logger.error { "Failed to initialize watch folder. No path found in watch folder definition." } return false end logger.debug { "Parameter 'paths' initialized." } logger.debug { "Initializing parameter 'includes'." } include = watch_folder_def['include'] includes = (watch_folder_def['includes'] ||= []) includes.concat [*include] if include includes.uniq! includes.map! { |e| Regexp.try_convert(e) || e } watch_folder_def['includes'] = includes watch_folder_def.delete('include') logger.debug { "Parameter `includes` initialized." } logger.debug { "Initializing parameter 'excludes'." } exclude = watch_folder_def['exclude'] exclude ||= '**/.*' excludes = (watch_folder_def['excludes'] ||= []) excludes.concat [*exclude] if exclude excludes.uniq! excludes.map! { |e| Regexp.try_convert(e) || e } watch_folder_def['excludes'] = excludes watch_folder_def.delete('exclude') logger.debug { "Parameter `excludes` initialized." } logger.debug { "Initializing parameter `quarantine directory path`." } quarantine_directory_path = watch_folder_def['quarantine_directory_path'] || watch_folder_def['quarantine_path'] if quarantine_directory_path quarantine_directory_path = File.expand_path(quarantine_directory_path) watch_folder_def['quarantine_directory_path'] = quarantine_directory_path unless Dir.exist?(quarantine_directory_path) logger.warn { "Quarantine directory path '#{quarantine_directory_path}' does not exist. Files will be ignored instead." } end end watch_folder_def.delete('quarantine_path') logger.debug { "Parameter `quarantine directory path` initialized." } logger.debug { "Initializing parameter 'completed directory path'." } completed_directory_path = watch_folder_def['completed_directory_path'] || watch_folder_def['completed_path'] if completed_directory_path completed_directory_path = File.expand_path(completed_directory_path) watch_folder_def['completed_directory_path'] = completed_directory_path unless Dir.exist?(completed_directory_path) logger.warn { "Completed directory path '#{completed_directory_path}' does not exist. File will be ignored instead." } end end watch_folder_def.delete('completed_path') logger.debug { "Parameter 'completed directory path' initialized." } logger.debug { "Initializing parameter `upload to storage id`." } storage_id = watch_folder_def['upload_to_storage_id'] || watch_folder_def['storage_id'] watch_folder_def['upload_to_storage_id'] ||= storage_id watch_folder_def.delete('storage_id') unless storage_id logger.warn { "No `upload to storage id` specified. Uploading will be skipped for this watch folder." } end logger.debug { "Parameter 'upload to storage id' initialized." } logger.debug { "Initializing parameter 'upload/import arguments'." } upload_args = watch_folder_def['upload_args'] item_add_args = watch_folder_def['item_add_args'] || { } item_add_options = watch_folder_def['item_add_options'] || { } import_args = watch_folder_def['import_args'] || { } import_options = watch_folder_def['import_options'] || { } import_args = Hash[import_args.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym.downcase : k, v ] }] import_options = Hash[import_options.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym.downcase : k, v ] }] # Allow adding to collection to be overridden add_item_to_collection = item_add_args.fetch(:add_item_to_collection, item_add_options.fetch(:add_item_to_collection, watch_folder_def['add_item_to_collection'])) if add_item_to_collection.nil? || add_item_to_collection _add_item_to_collection = false collection_id = watch_folder_def['collection_id'] if collection_id item_add_args[:collection_id] ||= collection_id _add_item_to_collection = true watch_folder_def.delete('collection_id') else collection_name = watch_folder_def['collection_name'] if collection_name item_add_args[:collection_name] ||= collection_name _add_item_to_collection = true watch_folder_def.delete('collection_name') else file_path_collection_name_position = watch_folder_def['file_path_collection_name_position'] if file_path_collection_name_position item_add_args[:file_path_collection_name_position] = file_path_collection_name_position _add_item_to_collection = true watch_folder_def.delete('file_path_collection_name_position') end end end import_options[:add_item_to_collection] ||= _add_item_to_collection end metadata = watch_folder_def['metadata'] if metadata item_add_args[:metadata] = metadata watch_folder_def.delete('metadata') end field_group = watch_folder_def['field_group'] if field_group item_add_args[:field_group] = field_group watch_folder_def.delete('field_group') end ingest_group = watch_folder_def['ingest_group'] if ingest_group job_metadata = (import_args[:jobmetadata] ||= '') job_metadata += ',' unless job_metadata.empty? job_metadata += "portal_groups:StringArray=#{ingest_group}" import_args[:jobmetadata] = job_metadata watch_folder_def.delete('ingest_group') end item_add_args = symbolize_keys(item_add_args) (item_add_args[:import_args] ||= {}).merge! import_args if import_args.is_a?(Hash) (item_add_args[:import_options] ||= {}).merge! import_options if import_options.is_a?(Hash) upload_args = symbolize_keys(upload_args) ((upload_args ||= {})[:item_add_args] ||= {}).merge! item_add_args if item_add_args.is_a?(Hash) ((upload_args ||= {})[:item_add_options] ||= {}).merge! symbolize_keys(item_add_options) if item_add_options.is_a?(Hash) watch_folder_def.delete('import_args') watch_folder_def.delete('import_options') watch_folder_def['upload_args'] = upload_args logger.debug { "Parameter 'upload/import arguments' initialized. #{upload_args}" } maximum_active_processors = watch_folder_def['maximum_active_processors'] if maximum_active_processors.nil? maximum_active_processors = @default_maximum_active_processors watch_folder_def['maximum_active_processors'] = maximum_active_processors end logger.debug { "Initializing parameter 'agents'." } agent = watch_folder_def['agent'] agents = watch_folder_def['agents'] ||= [ ] if agents.is_a?(Hash) agents = agents.map { |k,v| v['name'] ||= k; v } end if agent if agent.is_a?(Hash) && agent.keys.length == 1 agent = agent.map { |k,v| v['name'] ||= k; v } end agents.concat [*agent] watch_folder_def.delete('agent') end logger.debug { "Parameter 'agent' initialized." } @poll_interval = watch_folder_def['poll_interval'] ||= DEFAULT_POLL_INTERVAL end
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 469 def run handler.run if handler.respond_to?(:run) end
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 473 def stable_files handler.stable_files end
Converts hash keys to symbols
@param [Hash] value hash @param [Boolean] recursive Will recurse into any values that are hashes or arrays
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 481 def symbolize_keys (value, recursive = true) case value when Hash Hash[value.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym : k, recursive ? symbolize_keys(v, true) : v ] }] when Array value.map { |v| symbolize_keys(v, recursive) } else value end end