class Envoi::WatchFolderUtility::WatchFolder

Constants

DEFAULT_POLL_INTERVAL
DEFAULT_PROCESSOR_COUNT_LIMIT

Attributes

agent[RW]
definition[RW]
handler[RW]
last_poll_time[RW]
logger[RW]
min_stable_poll_count[RW]
min_stable_time[RW]
poll_interval[RW]
processors[RW]

Public Class Methods

new(args = { }) click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 23
def initialize(args = { })
  initialize_logger(args)
  @definition = args[:definition].dup

  logger.debug { "Initializing Watch Folder. #{Object.__id__}" }

  @ignored_file_paths = [ ]
  @ignored_file_paths_lock = Mutex.new

  @threaded = args.fetch(:threaded, true)

  @default_maximum_active_processors = DEFAULT_PROCESSOR_COUNT_LIMIT
  @processors = { }
  # @processors = Hash.new { |h, k| h[k] = {} }


  @default_handler_class = Envoi::WatchFolderUtility::WatchFolder::Handler::Listen
  process_watch_folder_def

  initialize_handler

  @default_agent = args[:default_agent]
  @default_agent_class = args[:default_agent_class] || @default_agent.class

  @last_poll_time = nil

  process_agent_defs
  logger.debug { "Watch Folder Initialized. #{Object.__id__}" }
end

Public Instance Methods

add_file_to_ignored_file_paths(file) click to toggle source

@param [Object] file

# File lib/envoi/watch_folder_utility/watch_folder.rb, line 96
def add_file_to_ignored_file_paths(file)
  logger.debug { "Adding File to Ignore Cache: '#{file.path}'" }
  @ignored_file_paths_lock.synchronize do
    @ignored_file_paths << file.path
    file.ignore if file.respond_to?(:ignore)
  end
end
find_in_patterns(patterns, file) click to toggle source

Used to compare file to patterns

# File lib/envoi/watch_folder_utility/watch_folder.rb, line 105
def find_in_patterns(patterns, file)
  patterns.find do |pattern|
    matched = pattern.is_a?(Regexp) ? pattern.match(file.path) : File.fnmatch(pattern, file.path)
    logger.debug { "#{pattern} #{matched ? 'matched' : "didn't match"} #{file.path}" }
    matched
  end
end
ignored_file_paths() click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 113
def ignored_file_paths
  handler.ignored_files_map.keys
end
initialize_agent(args = {}) click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 75
def initialize_agent(args = {})
  @agent ||= begin
    logger.debug { "Initializing Agent. #{@default_agent_class} #{args}" }
    _agent = @default_agent_class.new(config: args, logger: logger, default_preserve_file_path: false)
    logger.debug { "Agent Instance created." }
    _agent
  end
end
initialize_handler(watch_folder_def = @definition) click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 84
def initialize_handler(watch_folder_def = @definition)
  args_out              = {}
  args_out[:logger]     ||= logger.dup
  args_out[:definition] = watch_folder_def

  handler_class = @default_handler_class
  logger.debug { "Creating Watch Folder Handler Instance. #{handler_class.name}" }
  @handler = handler_class.new(args_out)
  logger.debug { "Watch Folder Handler Instance Created. #{handler_class.name}" }
end
initialize_logger(args = {}) click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 53
def initialize_logger(args = {})
  @logger = args[:logger]
  # @logger   = args[:logger] ||= begin
  #   _log_to = MultiIO.new(STDOUT)
  #   log_to = args[:log_to]
  #   log_age = args[:log_age] || 'daily'
  #   _log_to.add_target(File.open(log_to, 'a')) if log_to
  #   _logger = Logger.new(_log_to, log_age)
  # end
  #
  # log_level = args[:log_level] ||= Logger::INFO
  # if log_level
  #   if log_level.is_a?(String)
  #     log_level.downcase!
  #     _log_level = %w(fatal error warn info debug).find { |v| v == log_level }
  #     log_level = _log_level ? Logger::Severity.const_get(_log_level.to_sym.upcase) : Logger::INFO
  #   end
  #   @logger.level = log_level
  # end
  @logger
end
name() click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 138
def name
  @name ||= definition['name'] || definition['paths'] || definition['path'] || Object.__id__
end
poll() click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 117
def poll
  @previous_poll_time = @last_poll_time
  @last_poll_time = Time.now

  handler.poll if handler.respond_to?(:poll)
end
poll_interval_elapsed?() click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 124
def poll_interval_elapsed?
  !last_poll_time || (Time.now - last_poll_time) >= poll_interval
end
process_agent_def(agent_def) click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 128
def process_agent_def(agent_def)
  if @default_agent
    _agent_def_storages = agent_def['storages'] || {}
    if _agent_def_storages
      agent_def['storages'] = default_agent.agent_config_storages.merge(_agent_def_storages)
    end
  end
  initialize_agent(agent_def)
end
process_agent_defs() click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 142
def process_agent_defs
  agent_defs = definition['agents']
  if agent_defs
    if agent_defs.is_a?(Hash)
      _agents = agent_defs.map do |name, agent_def|
        agent_def['name'] ||= name
        # agent_def['type'] ||= name
        process_agent_def(agent_def)
      end
    elsif agent_defs.is_a?(Array)
      _agents = agent_defs.map { |agent_def| process_agent_def(agent_def) }
    else

    end
  end
  @agent ||= @default_agent
end
process_stable_file(file) click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 357
def process_stable_file(file)
  file.processing = true
  file_name       = file.name || file.path
  logger.debug { "Processing File '#{file_name}'" }

  storage_id                = definition['upload_to_storage_id']

  unless storage_id
    logger.warn { "Skipping processing of file because of missing storage ID." }
    return { success: false, message: 'Missing storage ID.' }
  end

  quarantine_directory_path = definition['quarantine_directory_path']
  completed_directory_path  = definition['completed_directory_path']
  watch_folder_upload_args  = definition['upload_args']

  # full_file_path = File.join(watch_folder.path, file.path)
  full_file_path = file.path

  upload_args = {
      file_path: full_file_path,
      storage_id: storage_id
  }
  upload_args.merge!(watch_folder_upload_args) if watch_folder_upload_args.is_a?(Hash)

  logger.debug { "Executing Upload. #{upload_args}" }
  _response = agent.upload(upload_args)
  _response = { success: _response } if _response == true || _response == false

  if _response[:success]
    if completed_directory_path
      if Dir.exist?(completed_directory_path)
        logger.debug { "Moving '#{full_file_path}' to completed directory path '#{completed_directory_path}'" }
        FileUtils.mv full_file_path, completed_directory_path
      else
        logger.warn { "Completed directory path not found: '#{completed_directory_path}'" }
        add_file_to_ignored_file_paths(file)
      end
    else
      FileUtils.rm full_file_path
    end
  else
    if quarantine_directory_path && Dir.exist?(quarantine_directory_path)
      logger.warn { "Moving '#{full_file_path}' to quarantine directory path '#{quarantine_directory_path}'" }
      FileUtils.mv full_file_path, quarantine_directory_path
    else
      logger.warn { "Adding '#{full_file_path}' to the temporary ignore list." }
      add_file_to_ignored_file_paths(file)
    end
  end

  file.processed = true

  _response
rescue => e
  file.exception = e
  raise e
ensure
  file.processing = false
end
process_stable_files() click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 419
def process_stable_files
  maximum_active_processors = definition['maximum_active_processors']

  includes = definition['includes']
  excludes = definition['excludes']

  stable_files.each do |file|
    file.watch_folder ||= self
    next if file.respond_to?(:ignore?) ? file.ignore? : ignored_file_paths.include?(file.path)
    next if file.processing || file.processed

    if includes && !includes.empty?
      should_include = find_in_patterns(includes, file)
      unless should_include
        add_file_to_ignored_file_paths(file)
        next
      end
    end

    should_exclude = find_in_patterns(excludes, file)
    if should_exclude
      add_file_to_ignored_file_paths(file)
      next
    end

    if @threaded
      processors.keep_if { |k, v| k.processing }
      if processors.length >= maximum_active_processors
        logger.debug { "Maximum number of active processors reached for watch folder. #{wf.name || wf.paths}" }
        break
      end
      t = Thread.new(file) do |file|
        begin
          process_stable_file(file)
        rescue => e
          logger.error { "Exception '#{e.message}' in thread for `#{name}` `#{file.path}`. " }
          raise e
        ensure
          file.processing = false rescue nil
        end
      end
      # t.join
      processors[file] = t if file.processing
    else
      process_stable_file(file)
    end

  end
end
process_watch_folder_def(watch_folder_def = @definition) click to toggle source

@param [Hash] watch_folder_def @option watch_folder_def [String] path @option watch_folder_def [String] upload_to_storage_id @option watch_folder_def [String] name (path) @option watch_folder_def [Array<String>] paths ([path]) @option watch_folder_def [String] exclude ('*/.') @option watch_folder_def [Array<string>] excludes ([exclude]) @option watch_folder_def [String] include @option watch_folder_def [Array<String>] includes ([include]) @option watch_folder_def [String] quarantine_directory_path @option watch_folder_def [String] completed_directory_path @option watch_folder_def [Hash] import_args ({}) @option watch_folder_def [Hash] import_options ({}) @option watch_folder_def [Integer|False] maximum_active_processors (@default_maximum_active_processors) @option watch_folder_def [Hash] logging

# File lib/envoi/watch_folder_utility/watch_folder.rb, line 176
def process_watch_folder_def(watch_folder_def = @definition)
  logger.debug { "Initializing Watch Folder #{watch_folder_def.inspect}" }

  logger.debug { "Initializing parameter 'paths'." }
  name = watch_folder_def['name']

  path = watch_folder_def['path']

  paths = watch_folder_def['paths'] ||= []
  paths = [ paths ] if paths.is_a?(String)
  paths.concat [*path] if path
  paths.map! { |p| File.expand_path(p) }
  if paths.empty?
    name_as_path = File.expand_path(name)
    paths.concat name_as_path if Dir.exist?(name_as_path)
  end
  paths.uniq!
  watch_folder_def['paths'] = paths
  # watch_folder_def['path'] ||= paths.first if paths.length == 1
  watch_folder_def.delete('path')

  if paths.empty?
    logger.error { "Failed to initialize watch folder. No path found in watch folder definition." }
    return false
  end
  logger.debug { "Parameter 'paths' initialized." }

  logger.debug { "Initializing parameter 'includes'." }
  include  = watch_folder_def['include']
  includes = (watch_folder_def['includes'] ||= [])
  includes.concat [*include] if include
  includes.uniq!
  includes.map! { |e| Regexp.try_convert(e) || e }
  watch_folder_def['includes'] = includes
  watch_folder_def.delete('include')
  logger.debug { "Parameter `includes` initialized." }

  logger.debug { "Initializing parameter 'excludes'." }
  exclude  = watch_folder_def['exclude']
  exclude  ||= '**/.*'
  excludes = (watch_folder_def['excludes'] ||= [])
  excludes.concat [*exclude] if exclude
  excludes.uniq!
  excludes.map! { |e| Regexp.try_convert(e) || e }
  watch_folder_def['excludes'] = excludes
  watch_folder_def.delete('exclude')
  logger.debug { "Parameter `excludes` initialized." }

  logger.debug { "Initializing parameter `quarantine directory path`." }
  quarantine_directory_path = watch_folder_def['quarantine_directory_path'] || watch_folder_def['quarantine_path']
  if quarantine_directory_path
    quarantine_directory_path                     = File.expand_path(quarantine_directory_path)
    watch_folder_def['quarantine_directory_path'] = quarantine_directory_path

    unless Dir.exist?(quarantine_directory_path)
      logger.warn { "Quarantine directory path '#{quarantine_directory_path}' does not exist. Files will be ignored instead." }
    end
  end
  watch_folder_def.delete('quarantine_path')
  logger.debug { "Parameter `quarantine directory path` initialized." }

  logger.debug { "Initializing parameter 'completed directory path'." }
  completed_directory_path = watch_folder_def['completed_directory_path'] || watch_folder_def['completed_path']
  if completed_directory_path
    completed_directory_path                     = File.expand_path(completed_directory_path)
    watch_folder_def['completed_directory_path'] = completed_directory_path

    unless Dir.exist?(completed_directory_path)
      logger.warn { "Completed directory path '#{completed_directory_path}' does not exist. File will be ignored instead." }
    end
  end
  watch_folder_def.delete('completed_path')
  logger.debug { "Parameter 'completed directory path' initialized." }

  logger.debug { "Initializing parameter `upload to storage id`." }
  storage_id = watch_folder_def['upload_to_storage_id'] || watch_folder_def['storage_id']
  watch_folder_def['upload_to_storage_id'] ||= storage_id
  watch_folder_def.delete('storage_id')
  unless storage_id
    logger.warn { "No `upload to storage id` specified. Uploading will be skipped for this watch folder." }
  end
  logger.debug { "Parameter 'upload to storage id' initialized." }

  logger.debug { "Initializing parameter 'upload/import arguments'." }
  upload_args               = watch_folder_def['upload_args']
  item_add_args             = watch_folder_def['item_add_args'] || { }
  item_add_options          = watch_folder_def['item_add_options'] || { }
  import_args               = watch_folder_def['import_args'] || { }
  import_options            = watch_folder_def['import_options'] || { }

  import_args = Hash[import_args.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym.downcase : k, v ] }]
  import_options = Hash[import_options.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym.downcase : k, v ] }]

  # Allow adding to collection to be overridden
  add_item_to_collection = item_add_args.fetch(:add_item_to_collection,
                                               item_add_options.fetch(:add_item_to_collection,
                                                                  watch_folder_def['add_item_to_collection']))
  if add_item_to_collection.nil? || add_item_to_collection
    _add_item_to_collection = false
    collection_id = watch_folder_def['collection_id']
    if collection_id
      item_add_args[:collection_id] ||= collection_id
      _add_item_to_collection = true
      watch_folder_def.delete('collection_id')
    else
      collection_name = watch_folder_def['collection_name']
      if collection_name
        item_add_args[:collection_name] ||= collection_name
        _add_item_to_collection = true
        watch_folder_def.delete('collection_name')
      else
        file_path_collection_name_position = watch_folder_def['file_path_collection_name_position']
        if file_path_collection_name_position
          item_add_args[:file_path_collection_name_position] = file_path_collection_name_position
          _add_item_to_collection = true
          watch_folder_def.delete('file_path_collection_name_position')
        end
      end
    end
    import_options[:add_item_to_collection] ||= _add_item_to_collection
  end

  metadata = watch_folder_def['metadata']
  if metadata
    item_add_args[:metadata] = metadata
    watch_folder_def.delete('metadata')
  end

  field_group = watch_folder_def['field_group']
  if field_group
    item_add_args[:field_group] = field_group
    watch_folder_def.delete('field_group')
  end

  ingest_group = watch_folder_def['ingest_group']
  if ingest_group
    job_metadata = (import_args[:jobmetadata] ||= '')
    job_metadata += ',' unless job_metadata.empty?
    job_metadata += "portal_groups:StringArray=#{ingest_group}"
    import_args[:jobmetadata] = job_metadata
    watch_folder_def.delete('ingest_group')
  end

  item_add_args = symbolize_keys(item_add_args)
  (item_add_args[:import_args] ||= {}).merge! import_args if import_args.is_a?(Hash)
  (item_add_args[:import_options] ||= {}).merge! import_options if import_options.is_a?(Hash)

  upload_args = symbolize_keys(upload_args)
  ((upload_args ||= {})[:item_add_args] ||= {}).merge! item_add_args if item_add_args.is_a?(Hash)
  ((upload_args ||= {})[:item_add_options] ||= {}).merge! symbolize_keys(item_add_options) if item_add_options.is_a?(Hash)

  watch_folder_def.delete('import_args')
  watch_folder_def.delete('import_options')

  watch_folder_def['upload_args'] = upload_args
  logger.debug { "Parameter 'upload/import arguments' initialized. #{upload_args}" }

  maximum_active_processors = watch_folder_def['maximum_active_processors']
  if maximum_active_processors.nil?
    maximum_active_processors = @default_maximum_active_processors
    watch_folder_def['maximum_active_processors'] = maximum_active_processors
  end

  logger.debug { "Initializing parameter 'agents'." }
  agent = watch_folder_def['agent']
  agents = watch_folder_def['agents'] ||= [ ]
  if agents.is_a?(Hash)
    agents = agents.map { |k,v| v['name'] ||= k;  v }
  end
  if agent
    if agent.is_a?(Hash) && agent.keys.length == 1
      agent = agent.map { |k,v| v['name'] ||= k; v }
    end
    agents.concat [*agent]
    watch_folder_def.delete('agent')
  end
  logger.debug { "Parameter 'agent' initialized." }

  @poll_interval = watch_folder_def['poll_interval'] ||= DEFAULT_POLL_INTERVAL
end
run() click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 469
def run
  handler.run if handler.respond_to?(:run)
end
stable_files() click to toggle source
# File lib/envoi/watch_folder_utility/watch_folder.rb, line 473
def stable_files
  handler.stable_files
end
symbolize_keys(value, recursive = true) click to toggle source

Converts hash keys to symbols

@param [Hash] value hash @param [Boolean] recursive Will recurse into any values that are hashes or arrays

# File lib/envoi/watch_folder_utility/watch_folder.rb, line 481
def symbolize_keys (value, recursive = true)
  case value
  when Hash
    Hash[value.map { |k,v| [ k.respond_to?(:to_sym) ? k.to_sym : k, recursive ? symbolize_keys(v, true) : v ] }]
  when Array
    value.map { |v| symbolize_keys(v, recursive) }
  else
    value
  end
end