class Envoi::Aspera::WatchService::AWF

Attributes

client[RW]
definition[RW]
last_poll_time[RW]
poll_interval[RW]
previous_poll_time[RW]
state[RW]
watcher[RW]

Private Class Methods

new(definition, state = nil, client = nil) click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 161
def initialize(definition, state = nil, client = nil)
  @definition = definition
  @state      = state || State.new
  @client     = @watcher = client || Client.new
  @poll_interval = definition.fetch('poll_interval', 15)
  process_definition(definition)
end
process_watch_folder(watch_folder, &block) click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 294
def self.process_watch_folder(watch_folder, &block)
  watch_folder.poll(&block) if (Time.now - watch_folder.last_poll_time) >= watch_folder.poll_interval
end
process_watch_folder_def(watch_folder_def) click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 285
def self.process_watch_folder_def(watch_folder_def)
  new(watch_folder_def)
end
process_watch_folder_defs(watch_folder_defs) click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 289
def self.process_watch_folder_defs(watch_folder_defs)


end
process_watch_folders(watch_folders, &block) click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 298
def self.process_watch_folders(watch_folders, &block)
  watch_folders.each { |watch_folder| process_watch_folder(watch_folder, &block) }
end
run(watch_folders, poll_interval = 15, &block) click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 306
def self.run(watch_folders, poll_interval = 15, &block)
  puts 'Starting...'
  loop do
    begin
      run_once(watch_folders, &block)
      sleep 1
    rescue SystemExit, Interrupt
      break
    end
  end
  puts 'Exiting...'
end
run_once(watch_folders, &block) click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 302
def self.run_once(watch_folders, &block)
  process_watch_folders(watch_folders, &block)
end

Private Instance Methods

events(from, to) click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 189
def events(from, to)
  state.subscription.client.subscription_snapshot_differential(subscription_id: state.subscription['identifier'],
                                                               from:            from,
                                                               to:              to)
end
logger() click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 169
def logger
  client.logger
end
path() click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 173
def path
  definition['path']
end
poll() { |self| ... } click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 195
def poll
  subscription              = state.subscription
  subscription.resubscribe
  previous_snapshot_version = subscription.snapshots.keys.last

  # Delete other snapshots
  subscription.snapshots.delete_if { |p, e| p != previous_snapshot_version } if previous_snapshot_version

  snapshot = subscription.snapshot_create
  @previous_poll_time = @last_poll_time
  @last_poll_time = Time.now

  no_change = snapshot.version == previous_snapshot_version
  if !no_change
    events = snapshot.differential(previous_snapshot_version)
    logger.debug { "Events: #{events}" }
  end

  _known_path_map = state.known_path_map.dup

  logger.debug { "Subscription ID: #{subscription['identifier']}" }
  logger.debug { "Previous Snapshot: #{previous_snapshot_version}" }
  logger.debug { "Current  Snapshot: #{snapshot.version}" }
  logger.debug { "Known Paths: #{_known_path_map.keys}" }

  current_path_map = snapshot.entries_by_path
  current_paths    = current_path_map.keys

  new_path_map = current_path_map.dup

  deleted_path_map  = {}
  stable_path_map   = {}
  unstable_path_map = {}
  _known_path_map.delete_if do |p, e|
    deleted = !no_change && !current_paths.include?(p)
    if deleted
      logger.debug { "DELETED '#{p}'" }
      deleted_path_map[p] = e
    else
      new_path_map.delete(p) # The file is not new, so remove it from the new list

      new_entry     = current_path_map[p]
      previous_stat = e.stat.values_at('mtime', 'size')
      new_stat      = new_entry.stat.values_at('mtime', 'size')

      if no_change || new_stat === previous_stat
        logger.debug { "UNCHANGED '#{p}' '#{new_stat}' == '#{previous_stat}'" }
        stable_poll_count     = e[:stable_poll_count] || 0
        stable_poll_count     += 1
        e[:stable_poll_count] = stable_poll_count
        stable_path_map[p]    = e
      else
        logger.debug { "MODIFIED '#{p}' '#{new_stat}' != '#{previous_stat}'" }
        e[:stable_poll_count] = 0
        unstable_path_map[p]  = new_entry
      end
    end
    deleted
  end

  new_path_map.each do |p, e|
    logger.debug { "CREATED '#{p}' #{e.stat.values_at('mtime', 'size')}" }
    e[:stable_poll_count] = 0
    unstable_path_map[p]  = e.dup
  end

  _known_path_map.merge! stable_path_map
  _known_path_map.merge! unstable_path_map unless no_change
  state.known_path_map = _known_path_map
  state.details = {
    previous_snapshot_version: previous_snapshot_version,
    maps: {
      new:      new_path_map,
      deleted:  deleted_path_map,
      stable:   stable_path_map,
      unstable: unstable_path_map
    }
  }

  logger.debug { "Known Paths: #{_known_path_map.keys}" }
  logger.debug { "New Paths: #{new_path_map.map { |p, e| "#{p} #{e.stat}" }}" }

  logger.debug { "Deleted Paths: #{deleted_path_map.keys}" }
  logger.debug { "Unstable Paths: #{unstable_path_map.keys}" }
  logger.debug { "Stable Paths: #{stable_path_map.map { |p, e| [p, e[:stable_poll_count]] }}" }

  yield self if block_given?

end
process_definition(watch_folder_def) click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 177
def process_definition(watch_folder_def)
  state.subscription = subscription_get_or_create(watch_folder_def)
end
process_stable_entry(entry) click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 185
def process_stable_entry(entry)
  logger.debug { "Stable Entry Detected: #{entry.stat}" }
end
subscription_get_or_create(watch_folder_def) click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 181
def subscription_get_or_create(watch_folder_def)
  Subscription.get_or_create(client, watch_folder_def)
end