class Envoi::Mam::Cantemo::Agent::WatchFolderManager
Constants
- DEFAULT_WATCH_FOLDER_PROCESSOR_LIMIT
- LWF
AWF =
Envoi::Aspera::WatchService::WatchFolder
#Aspera
Watch Folder
Attributes
Public Class Methods
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 39 def initialize(args = {}) initialize_logger(args) logger.info { 'Initializing Agent Watch Folder Manager.' } args[:default_preserve_file_path] = args.fetch(:default_preserve_file_path, false) @config = Envoi::Mam::Cantemo::Agent.load_config_from_file(args) initialize_logger_from_config args[:logger] = @logger @ignored_file_paths_by_watch_folder = Hash.new { |h, k| h[k] = [] } @ignored_file_paths_lock = Mutex.new @threaded = args.fetch(:threaded, config.fetch(:threaded, config.fetch('threaded', true))) @default_maximum_active_processors = DEFAULT_WATCH_FOLDER_PROCESSOR_LIMIT @processors_by_watch_folder = Hash.new { |h, k| h[k] = {} } @watch_folder_defs = config[:watch_folders] || config['watch_folders'] @default_agent_class = Envoi::Mam::Cantemo::Agent cantemo_config = config[:cantemo] || config['cantemo'] if cantemo_config logger.debug { 'Initializing Default Cantemo Portal Agent.' } @default_agent = @default_agent_class.new(args.merge({ config: cantemo_config })) logger.debug { 'Default Cantemo Portal Agent Initialized.' } @default_storages = @default_agent.agent_config_storages if !@watch_folder_defs @watch_folder_defs = cantemo_config[:watch_folders] || cantemo_config['watch_folders'] end else @default_agent = nil end process_watch_folder_defs end
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 246 def self.run(args) w = self.new(args) w.run w end
Public Instance Methods
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 78 def initialize_logger(args = {}) @logger = MultiLogger.new(Logger.new(STDOUT)) _logger = args[:logger] ||= begin log_to = args[:log_to] log_age = args.fetch(:log_age, 'daily') log_to ? Logger.new(log_to, log_age) : nil end @logger.targets << _logger if _logger log_level = args[:log_level] ||= Logger::INFO @logger.level = log_level if log_level @logger end
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 94 def initialize_logger_from_config(_config = @config) logger_args = { } [ :log_to, :log_level, :log_age ].each { |k| v = _config[k] || _config[k.to_s]; logger_args[k] = v if v } initialize_logger(logger_args) unless logger_args.empty? end
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 216 def pause if @should_run logger.info { 'Pausing...' } watch_folders.each { |wf| wf.pause if wf.respond_to?(:pause) } end end
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 149 def poll watch_folders_with_stable_files = [ ] watch_folders.each do |watch_folder| if watch_folder.respond_to?(:poll_interval_elapsed?) next unless watch_folder.poll_interval_elapsed? end if watch_folder.respond_to?(:poll) logger.debug { "Polling Watch Folder: #{watch_folder.name}" } watch_folder.poll end watch_folders_with_stable_files << watch_folder unless watch_folder.stable_files.empty? end # @TODO create processor/worker pool to limit total file processing across all watch folders watch_folders_with_stable_files.each do |watch_folder| #process_watch_folder_stable_files(watch_folder) total_active_processors = watch_folders.inject(0) { |sum, wf| sum + wf.processors.length } watch_folder.process_stable_files logger.debug { "Total active processors #{total_active_processors}" } end # Thread.list.each {|t| t.join unless t == Thread.current } # @processors_by_watch_folder.each { |wf, ap| ap.each { |f, t| t.join } } if @threaded end
@param [Hash] watch_folder_def @option watch_folder_def [String] path @option watch_folder_def [String] upload_to_storage_id @option watch_folder_def [String] name (path) @option watch_folder_def [Array<String>] paths ([path]) @option watch_folder_def [String] exclude ('*/.') @option watch_folder_def [Array<string>] excludes ([exclude]) @option watch_folder_def [String] include @option watch_folder_def [Array<String>] includes ([include]) @option watch_folder_def [String] quarantine_directory_path @option watch_folder_def [String] completed_directory_path @option watch_folder_def [Integer|False] maximum_active_processors (@default_maximum_active_processors) @option watch_folder_def [Hash] logging
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 113 def process_watch_folder_def(watch_folder_def) args_out = {} logging = watch_folder_def['logging'] || watch_folder_def log_to = logging['log_to'] log_level = logging['log_level'] # args_out[:log_to] = log_to if log_to && !log_to.empty? # args_out[:log_level] = log_level if log_level && !log_level.empty? args_out[:logger] = logger # unless log_to args_out[:default_agent] = default_agent args_out[:default_agent_class] = default_agent_class args_out[:definition] = watch_folder_def Envoi::WatchFolderUtility::WatchFolder.new(args_out) end
Iterates through watch_folder_defs
and populates @watch_folders with watch folders initialized from the watch folder definitions
Supports both array and hash formats, the hash format will use the key as the mame if the definition doesn't already have a name set
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 133 def process_watch_folder_defs logger.debug { 'Processing watch folder definitions.' } if watch_folder_defs.is_a?(Array) @watch_folders = watch_folder_defs.map { |watch_folder_def| process_watch_folder_def(watch_folder_def) } elsif watch_folder_defs.is_a?(Hash) @watch_folders = watch_folder_defs.map do |name, watch_folder_def| watch_folder_def['name'] ||= name process_watch_folder_def(watch_folder_def) end else raise "Unhandled format: #{watch_folder_defs.class.name}" end @watch_folders.keep_if { |wf| wf } logger.debug { 'Processing of watch folder definitions completed.' } end
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 223 def resume if @should_run logger.info { 'Resuming...' } watch_folders.each { |wf| wf.resume if wf.respond_to?(:resume) } end end
The main execution method
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 182 def run @should_run = true # AWF.run_once(watch_folders) { |wf| pp wf } # AWF.run(watch_folders) { |wf| process_watch_folder(wf) } logger.info { 'Running...' } watch_folders.map { |wf| wf.respond_to?(:run) ? wf.run : logger.debug { "Skip run for #{wf}" } } logger.debug { 'Initial Run Complete.' } while(@should_run) do begin poll sleep 1 rescue Interrupt, SystemExit => e logger.debug { "Received Signal: #{e.class.name}" } break end end rescue => e logger.debug { "EXCEPTION #{e.message} \n#{e.backtrace.join("\n")}\n" } logger.error { "An error occurred. #{e.message}" } raise e ensure stop logger.info { 'Exiting...' } end
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 177 def run_once # AWF.run_once(watch_folders) { |wf| process_watch_folder(wf) } end
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 208 def stop if @should_run @should_run = false logger.info { 'Stopping...' } watch_folders.each { |wf| wf.stop if wf.respond_to?(:stop) } end end
Converts hash keys to symbols
@param [Hash] value hash @param [Boolean] recursive Will recurse into any values that are hashes or arrays
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 234 def symbolize_keys (value, recursive = true) case value when Hash Hash[value.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym : k, recursive ? symbolize_keys(v, true) : v ] }] when Array value.map { |v| symbolize_keys(v, recursive) } else value end # symbolize_keys end