class RFlow::Components::File::DirectoryWatcher

Component that watches a directory for new files. When it does, it (optionally) deletes them and sends along +RFlow::Message+s of type {RFlow::Message::Data::File} and RFlow::Message::Data::Raw.

Accepts config parameters:

Constants

DEFAULT_CONFIG

Default config.

Attributes

config[RW]

@!visibility private

directory_path[RW]

@!visibility private

file_name_glob[RW]

@!visibility private

poll_interval[RW]

@!visibility private

remove_files[RW]

@!visibility private

Public Instance Methods

configure!(config) click to toggle source

RFlow-called method at startup. @return [void]

# File lib/rflow/components/file/directory_watcher.rb, line 45
def configure!(config)
  @config = DEFAULT_CONFIG.merge config
  @directory_path  = ::File.expand_path(@config['directory_path'])
  @file_name_glob  = @config['file_name_glob']
  @poll_interval   = @config['poll_interval'].to_i
  @files_per_poll  = @config['files_per_poll'].to_i
  @remove_files    = to_boolean(@config['remove_files'])

  unless ::File.directory?(@directory_path)
    raise ArgumentError, "Invalid directory '#{@directory_path}'"
  end

  unless ::File.readable?(@directory_path)
    raise ArgumentError, "Unable to read from directory '#{@directory_path}'"
  end

  # TODO: more error checking of input config
end
run!() click to toggle source

RFlow-called method at startup. @return [void]

# File lib/rflow/components/file/directory_watcher.rb, line 66
def run!
  # TODO: optimize sending of messages based on what is connected
  timer = EventMachine::PeriodicTimer.new(poll_interval) do
    RFlow.logger.debug { "#{name}: Polling for files in #{::File.join(@directory_path, @file_name_glob)}" }
    file_paths = Dir.glob(::File.join(@directory_path, @file_name_glob)).
      sort_by {|f| test(?M, f)}. # sort by last modified to process the earliest modified file first
      select {|f| shard.count == 1 || ((f.sum % shard.count) + 1 == worker.index) } # for multiple copies, share the load equally

    file_paths.first(@files_per_poll).each do |path|
      RFlow.logger.debug { "#{name}: Importing #{path}" }
      unless ::File.readable?(path)
        RFlow.logger.warn "#{name}: Unable to read file #{path}, skipping it"
        next
      end
      if @remove_files && !::File.writable?(path)
        RFlow.logger.warn "#{name}: Unable to remove file #{path}, skipping it"
        next
      end

      ::File.open(path, 'r:BINARY') do |file|
        content = file.read

        RFlow.logger.debug { "#{name}: Read #{content.bytesize} bytes of #{file.size} in #{file.path}, md5 #{Digest::MD5.hexdigest(content)}" }

        file_port.send_message(RFlow::Message.new('RFlow::Message::Data::File').tap do |m|
          m.data.path = ::File.expand_path(file.path)
          m.data.size = file.size
          m.data.content = content
          m.data.creation_timestamp = file.ctime
          m.data.modification_timestamp = file.mtime
          m.data.access_timestamp = file.atime
        end)

        raw_port.send_message(RFlow::Message.new('RFlow::Message::Data::Raw').tap do |m|
          m.data.raw = content
        end)
      end

      if @remove_files
        RFlow.logger.debug { "#{name}: Removing #{path}" }
        ::File.delete path
      end
    end
  end
end
to_boolean(string) click to toggle source

@!visibility private

# File lib/rflow/components/file/directory_watcher.rb, line 113
def to_boolean(string)
  case string
  when /^true$/i, '1', true; true
  when /^false/i, '0', false; false
  else raise ArgumentError, "'#{string}' cannot be coerced to a boolean value"
  end
end