module RFlow::Components::File::OutputToDisk

To be included into any component that plans to output files to disk. Provides default configuration and a {write_to_file} method.

Best not to depend on the filename other than the prefix and suffix portions as it is implementation-dependent.

Mixed-in component will support the following config parameters:

Constants

DEFAULT_CONFIG

Default config.

Attributes

config[RW]

@!visibility private

directory_path[RW]

@!visibility private

file_name_prefix[RW]

@!visibility private

file_name_suffix[RW]

@!visibility private

Public Instance Methods

configure!(config) click to toggle source

RFlow-called method at startup. @return [void]

# File lib/rflow/components/file/output_to_disk.rb, line 29
def configure!(config)
  @config = DEFAULT_CONFIG.merge config
  @directory_path  = ::File.expand_path(@config['directory_path'])
  @file_name_prefix = @config['file_name_prefix']
  @file_name_suffix = @config['file_name_suffix']

  unless ::File.directory?(@directory_path)
    raise ArgumentError, "Invalid directory '#{@directory_path}'"
  end

  unless ::File.writable?(@directory_path)
    raise ArgumentError, "Unable to read from directory '#{@directory_path}'"
  end

  # TODO: more error checking of input config
end
write_to_file(properties) { |file| ... } click to toggle source

Write out a file to disk based on message properties. Filename is implementation-dependent but will certainly contain the data UUID, priority, filename prefix, suffix, and a timestamp (properties data_uuid, priority plus config variables).

Opens the file and +yield+s it back to the caller for actual writing. Caller should return the number of bytes written.

@return [String] the final output file path

# File lib/rflow/components/file/output_to_disk.rb, line 54
def write_to_file(properties)
  properties ||= {}
  begin
    final_output_file_name = output_file_name(properties)

    temp_output_file_path = ::File.join(directory_path, ".#{final_output_file_name}")
    final_output_file_path = ::File.join(directory_path, "#{final_output_file_name}")

    RFlow.logger.debug { "#{self.class}: Outputting message to #{final_output_file_path} (via #{temp_output_file_path})" }

    ::File.open(temp_output_file_path, ::File::CREAT|::File::EXCL|::File::RDWR, 0644, :external_encoding => 'BINARY') do |file|
      file.flock(::File::LOCK_EX)
      bytes_written = yield file

      file.flush
      raise IOError, "file size of '#{::File.size(temp_output_file_path)}' does not match expected size of '#{bytes_written}'" unless ::File.size(temp_output_file_path) == bytes_written
    end
    ::File.rename(temp_output_file_path, final_output_file_path)
    final_output_file_path
  rescue StandardError, Errno::EEXIST => e
    RFlow.logger.error { "#{self.class} encountered #{e.message} when creating #{temp_output_file_path}" }
    begin
      ::File.delete(temp_output_file_path)
    rescue => f
      RFlow.logger.debug {"#{self.class} encountered #{f.message} on cleanup of #{temp_output_file_path}" }
    end
    raise e
  end
end

Private Instance Methods

current_timestamp() click to toggle source
# File lib/rflow/components/file/output_to_disk.rb, line 91
def current_timestamp
  time = Time.now
  time.utc.strftime("%Y%m%d_%H%M%S.") + "%06d" % time.utc.usec
end
output_file_name(properties) click to toggle source
# File lib/rflow/components/file/output_to_disk.rb, line 85
def output_file_name(properties)
  uuid = properties['data_uuid'] || UUIDTools::UUID.random_create.to_s
  priority = properties['priority'] || 0
  "#{file_name_prefix}.#{'%03d' % priority}.#{current_timestamp}.#{uuid}#{file_name_suffix}"
end