class Envoi::Mam::Cantemo::Agent::WATCH_FOLDER_MANAGER_CLASS

Constants

DEFAULT_WATCH_FOLDER_PROCESSOR_LIMIT
LWF

AWF = Envoi::Aspera::WatchService::WatchFolder # Aspera Watch Folder

Attributes

config[RW]
default_agent[RW]
default_agent_class[RW]
logger[RW]
watch_folder_defs[RW]
watch_folders[RW]

Public Class Methods

new(args = {}) click to toggle source
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 39
def initialize(args = {})
  initialize_logger(args)

  logger.info { 'Initializing Agent Watch Folder Manager.' }
  args[:default_preserve_file_path] = args.fetch(:default_preserve_file_path, false)

  @config = Envoi::Mam::Cantemo::Agent.load_config_from_file(args)
  initialize_logger_from_config
  args[:logger] = @logger

  @ignored_file_paths_by_watch_folder = Hash.new { |h, k| h[k] = [] }
  @ignored_file_paths_lock            = Mutex.new

  @threaded = args.fetch(:threaded, config.fetch(:threaded, config.fetch('threaded', true)))

  @default_maximum_active_processors = DEFAULT_WATCH_FOLDER_PROCESSOR_LIMIT
  @processors_by_watch_folder = Hash.new { |h, k| h[k] = {} }

  @watch_folder_defs = config[:watch_folders] || config['watch_folders']

  @default_agent_class = Envoi::Mam::Cantemo::Agent

  cantemo_config = config[:cantemo] || config['cantemo']
  if cantemo_config
    logger.debug { 'Initializing Default Cantemo Portal Agent.' }
    @default_agent = @default_agent_class.new(args.merge({ config: cantemo_config }))
    logger.debug { 'Default Cantemo Portal Agent Initialized.' }
    @default_storages = @default_agent.agent_config_storages

    if !@watch_folder_defs
      @watch_folder_defs = cantemo_config[:watch_folders] || cantemo_config['watch_folders']
    end
  else
    @default_agent = nil
  end

  process_watch_folder_defs
end
run(args) click to toggle source
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 246
def self.run(args)
  w = self.new(args)
  w.run
  w
end

Public Instance Methods

initialize_logger(args = {}) click to toggle source
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 78
def initialize_logger(args = {})
  @logger = MultiLogger.new(Logger.new(STDOUT))

  _logger = args[:logger] ||= begin
    log_to = args[:log_to]
    log_age = args.fetch(:log_age, 'daily')

    log_to ? Logger.new(log_to, log_age) : nil
  end
  @logger.targets << _logger if _logger

  log_level     = args[:log_level] ||= Logger::INFO
  @logger.level = log_level if log_level
  @logger
end
initialize_logger_from_config(_config = @config) click to toggle source
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 94
def initialize_logger_from_config(_config = @config)
  logger_args = { }
  [ :log_to, :log_level, :log_age ].each { |k| v = _config[k] || _config[k.to_s]; logger_args[k] = v if v }
  initialize_logger(logger_args) unless logger_args.empty?
end
pause() click to toggle source
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 216
def pause
  if @should_run
    logger.info { 'Pausing...' }
    watch_folders.each { |wf| wf.pause if wf.respond_to?(:pause) }
  end
end
poll() click to toggle source
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 149
def poll
  watch_folders_with_stable_files = [ ]
  watch_folders.each do |watch_folder|
    if watch_folder.respond_to?(:poll_interval_elapsed?)
      next unless watch_folder.poll_interval_elapsed?
    end

    if watch_folder.respond_to?(:poll)
      logger.debug { "Polling Watch Folder: #{watch_folder.name}" }
      watch_folder.poll
    end

    watch_folders_with_stable_files << watch_folder unless watch_folder.stable_files.empty?
  end

  # @TODO create processor/worker pool to limit total file processing across all watch folders
  watch_folders_with_stable_files.each do |watch_folder|
    #process_watch_folder_stable_files(watch_folder)

    total_active_processors = watch_folders.inject(0) { |sum, wf| sum + wf.processors.length }
    watch_folder.process_stable_files
    logger.debug { "Total active processors #{total_active_processors}" }
  end

  # Thread.list.each {|t| t.join unless t == Thread.current }
  # @processors_by_watch_folder.each { |wf, ap| ap.each { |f, t| t.join } } if @threaded
end
process_watch_folder_def(watch_folder_def) click to toggle source

@param [Hash] watch_folder_def @option watch_folder_def [String] path @option watch_folder_def [String] upload_to_storage_id @option watch_folder_def [String] name (path) @option watch_folder_def [Array<String>] paths ([path]) @option watch_folder_def [String] exclude ('*/.') @option watch_folder_def [Array<string>] excludes ([exclude]) @option watch_folder_def [String] include @option watch_folder_def [Array<String>] includes ([include]) @option watch_folder_def [String] quarantine_directory_path @option watch_folder_def [String] completed_directory_path @option watch_folder_def [Integer|False] maximum_active_processors (@default_maximum_active_processors) @option watch_folder_def [Hash] logging

# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 113
def process_watch_folder_def(watch_folder_def)
  args_out              = {}
  logging               = watch_folder_def['logging'] || watch_folder_def
  log_to                = logging['log_to']
  log_level             = logging['log_level']
  # args_out[:log_to]     = log_to if log_to && !log_to.empty?
  # args_out[:log_level]  = log_level if log_level && !log_level.empty?
  args_out[:logger]     = logger # unless log_to
  args_out[:default_agent] = default_agent
  args_out[:default_agent_class] = default_agent_class
  args_out[:definition] = watch_folder_def

  Envoi::WatchFolderUtility::WatchFolder.new(args_out)
end
process_watch_folder_defs() click to toggle source

Iterates through watch_folder_defs and populates @watch_folders with watch folders initialized from the watch folder definitions

Supports both array and hash formats, the hash format will use the key as the mame if the definition doesn't already have a name set

# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 133
def process_watch_folder_defs
  logger.debug { 'Processing watch folder definitions.' }
  if watch_folder_defs.is_a?(Array)
    @watch_folders = watch_folder_defs.map { |watch_folder_def| process_watch_folder_def(watch_folder_def) }
  elsif watch_folder_defs.is_a?(Hash)
    @watch_folders = watch_folder_defs.map do |name, watch_folder_def|
      watch_folder_def['name'] ||= name
      process_watch_folder_def(watch_folder_def)
    end
  else
    raise "Unhandled format: #{watch_folder_defs.class.name}"
  end
  @watch_folders.keep_if { |wf| wf }
  logger.debug { 'Processing of watch folder definitions completed.' }
end
resume() click to toggle source
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 223
def resume
  if @should_run
    logger.info { 'Resuming...' }
    watch_folders.each { |wf| wf.resume if wf.respond_to?(:resume) }
  end
end
run() click to toggle source

The main execution method

# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 182
def run
  @should_run = true

  # AWF.run_once(watch_folders) { |wf| pp wf }
  # AWF.run(watch_folders) { |wf| process_watch_folder(wf) }
  logger.info { 'Running...' }
  watch_folders.map { |wf| wf.respond_to?(:run) ? wf.run : logger.debug { "Skip run for #{wf}" } }
  logger.debug { 'Initial Run Complete.' }
  while(@should_run) do
    begin
      poll
      sleep 1
    rescue Interrupt, SystemExit => e
      logger.debug { "Received Signal: #{e.class.name}" }
      break
    end
  end
rescue => e
  logger.debug { "EXCEPTION #{e.message} \n#{e.backtrace.join("\n")}\n" }
  logger.error { "An error occurred. #{e.message}" }
  raise e
ensure
  stop
  logger.info { 'Exiting...' }
end
run_once() click to toggle source
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 177
def run_once
  # AWF.run_once(watch_folders) { |wf| process_watch_folder(wf) }
end
stop() click to toggle source
# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 208
def stop
  if @should_run
    @should_run = false
    logger.info { 'Stopping...' }
    watch_folders.each { |wf| wf.stop if wf.respond_to?(:stop) }
  end
end
symbolize_keys(value, recursive = true) click to toggle source

Converts hash keys to symbols

@param [Hash] value hash @param [Boolean] recursive Will recurse into any values that are hashes or arrays

# File lib/envoi/mam/cantemo/agent/watch_folder_manager.rb, line 234
def symbolize_keys (value, recursive = true)
  case value
  when Hash
    Hash[value.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym : k, recursive ? symbolize_keys(v, true) : v ] }]
  when Array
    value.map { |v| symbolize_keys(v, recursive) }
  else
    value
  end
  # symbolize_keys
end