class RFlow::Components::File::DirectoryWatcher
Component that watches a directory for new files. When it does, it (optionally) deletes them and sends along +RFlow::Message+s of type {RFlow::Message::Data::File} and RFlow::Message::Data::Raw
.
Accepts config parameters:
-
directory_path
- the directory path to monitor -
file_name_glob
- glob of filenames to monitor withindirectory_path
-
poll_interval
- how often, in seconds, to poll -
files_per_poll
- maximum number of files to be brought in per poll -
remove_files
- true to remove the files after they've been brought in
Constants
- DEFAULT_CONFIG
Default config.
Attributes
config[RW]
@!visibility private
directory_path[RW]
@!visibility private
file_name_glob[RW]
@!visibility private
poll_interval[RW]
@!visibility private
remove_files[RW]
@!visibility private
Public Instance Methods
configure!(config)
click to toggle source
RFlow-called method at startup. @return [void]
# File lib/rflow/components/file/directory_watcher.rb, line 45 def configure!(config) @config = DEFAULT_CONFIG.merge config @directory_path = ::File.expand_path(@config['directory_path']) @file_name_glob = @config['file_name_glob'] @poll_interval = @config['poll_interval'].to_i @files_per_poll = @config['files_per_poll'].to_i @remove_files = to_boolean(@config['remove_files']) unless ::File.directory?(@directory_path) raise ArgumentError, "Invalid directory '#{@directory_path}'" end unless ::File.readable?(@directory_path) raise ArgumentError, "Unable to read from directory '#{@directory_path}'" end # TODO: more error checking of input config end
run!()
click to toggle source
RFlow-called method at startup. @return [void]
# File lib/rflow/components/file/directory_watcher.rb, line 66 def run! # TODO: optimize sending of messages based on what is connected timer = EventMachine::PeriodicTimer.new(poll_interval) do RFlow.logger.debug { "#{name}: Polling for files in #{::File.join(@directory_path, @file_name_glob)}" } file_paths = Dir.glob(::File.join(@directory_path, @file_name_glob)). sort_by {|f| test(?M, f)}. # sort by last modified to process the earliest modified file first select {|f| shard.count == 1 || ((f.sum % shard.count) + 1 == worker.index) } # for multiple copies, share the load equally file_paths.first(@files_per_poll).each do |path| RFlow.logger.debug { "#{name}: Importing #{path}" } unless ::File.readable?(path) RFlow.logger.warn "#{name}: Unable to read file #{path}, skipping it" next end if @remove_files && !::File.writable?(path) RFlow.logger.warn "#{name}: Unable to remove file #{path}, skipping it" next end ::File.open(path, 'r:BINARY') do |file| content = file.read RFlow.logger.debug { "#{name}: Read #{content.bytesize} bytes of #{file.size} in #{file.path}, md5 #{Digest::MD5.hexdigest(content)}" } file_port.send_message(RFlow::Message.new('RFlow::Message::Data::File').tap do |m| m.data.path = ::File.expand_path(file.path) m.data.size = file.size m.data.content = content m.data.creation_timestamp = file.ctime m.data.modification_timestamp = file.mtime m.data.access_timestamp = file.atime end) raw_port.send_message(RFlow::Message.new('RFlow::Message::Data::Raw').tap do |m| m.data.raw = content end) end if @remove_files RFlow.logger.debug { "#{name}: Removing #{path}" } ::File.delete path end end end end
to_boolean(string)
click to toggle source
@!visibility private
# File lib/rflow/components/file/directory_watcher.rb, line 113 def to_boolean(string) case string when /^true$/i, '1', true; true when /^false/i, '0', false; false else raise ArgumentError, "'#{string}' cannot be coerced to a boolean value" end end