class Envoi::Aspera::WatchService::WatchFolder
Attributes
client[RW]
definition[RW]
last_poll_time[RW]
poll_interval[RW]
previous_poll_time[RW]
state[RW]
watcher[RW]
Private Class Methods
new(definition, state = nil, client = nil)
click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 161 def initialize(definition, state = nil, client = nil) @definition = definition @state = state || State.new @client = @watcher = client || Client.new @poll_interval = definition.fetch('poll_interval', 15) process_definition(definition) end
process_watch_folder(watch_folder, &block)
click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 294 def self.process_watch_folder(watch_folder, &block) watch_folder.poll(&block) if (Time.now - watch_folder.last_poll_time) >= watch_folder.poll_interval end
process_watch_folder_def(watch_folder_def)
click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 285 def self.process_watch_folder_def(watch_folder_def) new(watch_folder_def) end
process_watch_folder_defs(watch_folder_defs)
click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 289 def self.process_watch_folder_defs(watch_folder_defs) end
process_watch_folders(watch_folders, &block)
click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 298 def self.process_watch_folders(watch_folders, &block) watch_folders.each { |watch_folder| process_watch_folder(watch_folder, &block) } end
run(watch_folders, poll_interval = 15, &block)
click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 306 def self.run(watch_folders, poll_interval = 15, &block) puts 'Starting...' loop do begin run_once(watch_folders, &block) sleep 1 rescue SystemExit, Interrupt break end end puts 'Exiting...' end
run_once(watch_folders, &block)
click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 302 def self.run_once(watch_folders, &block) process_watch_folders(watch_folders, &block) end
Private Instance Methods
events(from, to)
click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 189 def events(from, to) state.subscription.client.subscription_snapshot_differential(subscription_id: state.subscription['identifier'], from: from, to: to) end
logger()
click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 169 def logger client.logger end
path()
click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 173 def path definition['path'] end
poll() { |self| ... }
click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 195 def poll subscription = state.subscription subscription.resubscribe previous_snapshot_version = subscription.snapshots.keys.last # Delete other snapshots subscription.snapshots.delete_if { |p, e| p != previous_snapshot_version } if previous_snapshot_version snapshot = subscription.snapshot_create @previous_poll_time = @last_poll_time @last_poll_time = Time.now no_change = snapshot.version == previous_snapshot_version if !no_change events = snapshot.differential(previous_snapshot_version) logger.debug { "Events: #{events}" } end _known_path_map = state.known_path_map.dup logger.debug { "Subscription ID: #{subscription['identifier']}" } logger.debug { "Previous Snapshot: #{previous_snapshot_version}" } logger.debug { "Current Snapshot: #{snapshot.version}" } logger.debug { "Known Paths: #{_known_path_map.keys}" } current_path_map = snapshot.entries_by_path current_paths = current_path_map.keys new_path_map = current_path_map.dup deleted_path_map = {} stable_path_map = {} unstable_path_map = {} _known_path_map.delete_if do |p, e| deleted = !no_change && !current_paths.include?(p) if deleted logger.debug { "DELETED '#{p}'" } deleted_path_map[p] = e else new_path_map.delete(p) # The file is not new, so remove it from the new list new_entry = current_path_map[p] previous_stat = e.stat.values_at('mtime', 'size') new_stat = new_entry.stat.values_at('mtime', 'size') if no_change || new_stat === previous_stat logger.debug { "UNCHANGED '#{p}' '#{new_stat}' == '#{previous_stat}'" } stable_poll_count = e[:stable_poll_count] || 0 stable_poll_count += 1 e[:stable_poll_count] = stable_poll_count stable_path_map[p] = e else logger.debug { "MODIFIED '#{p}' '#{new_stat}' != '#{previous_stat}'" } e[:stable_poll_count] = 0 unstable_path_map[p] = new_entry end end deleted end new_path_map.each do |p, e| logger.debug { "CREATED '#{p}' #{e.stat.values_at('mtime', 'size')}" } e[:stable_poll_count] = 0 unstable_path_map[p] = e.dup end _known_path_map.merge! stable_path_map _known_path_map.merge! unstable_path_map unless no_change state.known_path_map = _known_path_map state.details = { previous_snapshot_version: previous_snapshot_version, maps: { new: new_path_map, deleted: deleted_path_map, stable: stable_path_map, unstable: unstable_path_map } } logger.debug { "Known Paths: #{_known_path_map.keys}" } logger.debug { "New Paths: #{new_path_map.map { |p, e| "#{p} #{e.stat}" }}" } logger.debug { "Deleted Paths: #{deleted_path_map.keys}" } logger.debug { "Unstable Paths: #{unstable_path_map.keys}" } logger.debug { "Stable Paths: #{stable_path_map.map { |p, e| [p, e[:stable_poll_count]] }}" } yield self if block_given? end
process_definition(watch_folder_def)
click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 177 def process_definition(watch_folder_def) state.subscription = subscription_get_or_create(watch_folder_def) end
process_stable_entry(entry)
click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 185 def process_stable_entry(entry) logger.debug { "Stable Entry Detected: #{entry.stat}" } end
subscription_get_or_create(watch_folder_def)
click to toggle source
# File lib/envoi/aspera/watch_service/watch_folder.rb, line 181 def subscription_get_or_create(watch_folder_def) Subscription.get_or_create(client, watch_folder_def) end