class LogStash::Outputs::S3

INFORMATION:

This plugin batches and uploads logstash events into Amazon Simple Storage Service (Amazon S3).

Requirements:

S3 outputs create temporary files into the OS' temporary directory, you can specify where to save them using the `temporary_directory` option.

S3 output files have the following format

ls.s3.312bc026-2f5d-49bc-ae9f-5940cf4ad9a6.2013-04-18T10.00.tag_hello.part0.txt

|======= | ls.s3 | indicate logstash plugin s3 | | 312bc026-2f5d-49bc-ae9f-5940cf4ad9a6 | a new, random uuid per file. | | 2013-04-18T10.00 | represents the time whenever you specify time_file. | | tag_hello | this indicates the event's tag. | | part0 | this means if you indicate size_file then it will generate more parts if you file.size > size_file. When a file is full it will be pushed to the bucket and then deleted from the temporary directory. If a file is empty, it is simply deleted. Empty files will not be pushed | |=======

Crash Recovery:

Note regarding time_file and size_file

:

Both time_file and size_file settings can trigger a log "file rotation"
A log rotation pushes the current log "part" to s3 and deleted from local temporary storage.

If you specify BOTH size_file and time_file then it will create file for each tag (if specified).
When EITHER time_file minutes have elapsed OR log file size > size_file, a log rotation is triggered.

If you ONLY specify time_file but NOT file_size, one file for each tag (if specified) will be created.
When time_file minutes elapses, a log rotation will be triggered.

If you ONLY specify size_file, but NOT time_file, one files for each tag (if specified) will be created.
When size of log file part > size_file, a log rotation will be triggered.

If NEITHER size_file nor time_file is specified, ONLY one file for each tag (if specified) will be created.
WARNING: Since no log rotation is triggered, S3 Upload will only occur when logstash restarts.

#### Usage: This is an example of logstash config:

source,ruby

output {

s3{
  access_key_id => "crazy_key"             (required)
  secret_access_key => "monkey_access_key" (required)
  region => "eu-west-1"                    (optional, default = "us-east-1")
  bucket => "your_bucket"                  (required)
  size_file => 2048                        (optional) - Bytes
  time_file => 5                           (optional) - Minutes
  codec => "plain"                         (optional)
  canned_acl => "private"                  (optional. Options are "private", "public-read", "public-read-write", "authenticated-read", "aws-exec-read", "bucket-owner-read", "bucket-owner-full-control", "log-delivery-write". Defaults to "private" )
}

Constants

CRASH_RECOVERY_THREADPOOL
PERIODIC_CHECK_INTERVAL_IN_SECONDS
PREFIX_KEY_NORMALIZE_CHARACTER

Public Instance Methods

close() click to toggle source
# File lib/logstash/outputs/s3.rb, line 259
def close
  stop_periodic_check if @rotation.needs_periodic?

  @logger.debug("Uploading current workspace")

  # The plugin has stopped receiving new events, but we still have
  # data on disk, lets make sure it get to S3.
  # If Logstash get interrupted, the `restore_from_crash` (when set to true) method will pickup
  # the content in the temporary directly and upload it.
  # This will block the shutdown until all upload are done or the use force quit.
  @file_repository.each_files do |file|
    upload_file(file)
  end

  @file_repository.shutdown

  @uploader.stop # wait until all the current upload are complete
  @crash_uploader.stop if @restore # we might have still work to do for recovery so wait until we are done
end
full_options() click to toggle source
# File lib/logstash/outputs/s3.rb, line 279
def full_options
  options = aws_options_hash || {}
  options[:signature_version] = @signature_version if @signature_version
  symbolized_settings.merge(options)
end
multi_receive_encoded(events_and_encoded) click to toggle source
# File lib/logstash/outputs/s3.rb, line 238
def multi_receive_encoded(events_and_encoded)
  prefix_written_to = Set.new

  events_and_encoded.each do |event, encoded|
    prefix_key = normalize_key(event.sprintf(@prefix))
    prefix_written_to << prefix_key

    begin
      @file_repository.get_file(prefix_key) { |file| file.write(encoded) }
      # The output should stop accepting new events coming in, since it cannot do anything with them anymore.
      # Log the error and rethrow it.
    rescue Errno::ENOSPC => e
      @logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory)
      raise e
    end
  end

  # Groups IO calls to optimize fstat checks
  rotate_if_needed(prefix_written_to)
end
normalize_key(prefix_key) click to toggle source
# File lib/logstash/outputs/s3.rb, line 297
def normalize_key(prefix_key)
  prefix_key.gsub(PathValidator.matches_re, PREFIX_KEY_NORMALIZE_CHARACTER)
end
register() click to toggle source
# File lib/logstash/outputs/s3.rb, line 196
def register
  # I've move the validation of the items into custom classes
  # to prepare for the new config validation that will be part of the core so the core can
  # be moved easily.
  unless @prefix.empty?
    if !PathValidator.valid?(prefix)
      raise LogStash::ConfigurationError, "Prefix must not contains: #{PathValidator::INVALID_CHARACTERS}"
    end
  end

  if !WritableDirectoryValidator.valid?(@temporary_directory)
    raise LogStash::ConfigurationError, "Logstash must have the permissions to write to the temporary directory: #{@temporary_directory}"
  end

  if @validate_credentials_on_root_bucket && !WriteBucketPermissionValidator.new(@logger).valid?(bucket_resource, upload_options)
    raise LogStash::ConfigurationError, "Logstash must have the privileges to write to root bucket `#{@bucket}`, check your credentials or your permissions."
  end

  if @time_file.nil? && @size_file.nil? || @size_file == 0 && @time_file == 0
    raise LogStash::ConfigurationError, "The S3 plugin must have at least one of time_file or size_file set to a value greater than 0"
  end

  @file_repository = FileRepository.new(@tags, @encoding, @temporary_directory)

  @rotation = rotation_strategy

  executor = Concurrent::ThreadPoolExecutor.new({ :min_threads => 1,
                                                  :max_threads => @upload_workers_count,
                                                  :max_queue => @upload_queue_size,
                                                  :fallback_policy => :caller_runs })

  @uploader = Uploader.new(bucket_resource, @logger, executor, retry_count: @retry_count, retry_delay: @retry_delay)

  # Restoring from crash will use a new threadpool to slowly recover
  # New events should have more priority.
  restore_from_crash if @restore

  # If we need time based rotation we need to do periodic check on the file
  # to take care of file that were not updated recently
  start_periodic_check if @rotation.needs_periodic?
end
symbolize_keys(hash) click to toggle source
# File lib/logstash/outputs/s3.rb, line 289
def symbolize_keys(hash)
  return hash unless hash.is_a?(Hash)
  symbolized = {}
  hash.each { |key, value| symbolized[key.to_sym] = symbolize_keys(value) }
  symbolized
end
symbolized_settings() click to toggle source
# File lib/logstash/outputs/s3.rb, line 285
def symbolized_settings
  @symbolized_settings ||= symbolize_keys(@additional_settings)
end
upload_options() click to toggle source
# File lib/logstash/outputs/s3.rb, line 301
def upload_options
  {
    :acl => @canned_acl,
    :server_side_encryption => @server_side_encryption ? @server_side_encryption_algorithm : nil,
    :ssekms_key_id => @server_side_encryption_algorithm == "aws:kms" ? @ssekms_key_id : nil,
    :storage_class => @storage_class,
    :content_encoding => @encoding == "gzip" ? "gzip" : nil,
    :multipart_threshold => @upload_multipart_threshold
  }
end

Private Instance Methods

bucket_resource() click to toggle source
# File lib/logstash/outputs/s3.rb, line 330
def bucket_resource
  Aws::S3::Bucket.new(@bucket, full_options)
end
clean_temporary_file(file) click to toggle source
# File lib/logstash/outputs/s3.rb, line 378
def clean_temporary_file(file)
  @logger.debug("Removing temporary file", :file => file.path)
  file.delete!
end
restore_from_crash() click to toggle source

The upload process will use a separate uploader/threadpool with less resource allocated to it. but it will use an unbounded queue for the work, it may take some time before all the older files get processed.

# File lib/logstash/outputs/s3.rb, line 385
def restore_from_crash
  @crash_uploader = Uploader.new(bucket_resource, @logger, CRASH_RECOVERY_THREADPOOL)

  temp_folder_path = Pathname.new(@temporary_directory)
  Dir.glob(::File.join(@temporary_directory, "**/*"))
    .select { |file| ::File.file?(file) }
    .each do |file|
    temp_file = TemporaryFile.create_from_existing_file(file, temp_folder_path)
    if temp_file.size > 0
      @logger.debug("Recovering from crash and uploading", :file => temp_file.path)
      @crash_uploader.upload_async(temp_file, :on_complete => method(:clean_temporary_file), :upload_options => upload_options)
    else
      clean_temporary_file(temp_file)
    end
  end
end
rotate_if_needed(prefixes) click to toggle source
# File lib/logstash/outputs/s3.rb, line 334
def rotate_if_needed(prefixes)
  prefixes.each do |prefix|
    # Each file access is thread safe,
    # until the rotation is done then only
    # one thread has access to the resource.
    @file_repository.get_factory(prefix) do |factory|
      temp_file = factory.current

      if @rotation.rotate?(temp_file)
        @logger.debug("Rotate file",
                      :strategy => @rotation.class.name,
                      :key => temp_file.key,
                      :path => temp_file.path)

        upload_file(temp_file)
        factory.rotate!
      end
    end
  end
end
rotation_strategy() click to toggle source
# File lib/logstash/outputs/s3.rb, line 367
def rotation_strategy
  case @rotation_strategy
  when "size"
    SizeRotationPolicy.new(size_file)
  when "time"
    TimeRotationPolicy.new(time_file)
  when "size_and_time"
    SizeAndTimeRotationPolicy.new(size_file, time_file)
  end
end
start_periodic_check() click to toggle source

We start a task in the background for check for stale files and make sure we rotate them to S3 if needed.

# File lib/logstash/outputs/s3.rb, line 314
def start_periodic_check
  @logger.debug("Start periodic rotation check")

  @periodic_check = Concurrent::TimerTask.new(:execution_interval => PERIODIC_CHECK_INTERVAL_IN_SECONDS) do
    @logger.debug("Periodic check for stale files")

    rotate_if_needed(@file_repository.keys)
  end

  @periodic_check.execute
end
stop_periodic_check() click to toggle source
# File lib/logstash/outputs/s3.rb, line 326
def stop_periodic_check
  @periodic_check.shutdown
end
upload_file(temp_file) click to toggle source
# File lib/logstash/outputs/s3.rb, line 355
def upload_file(temp_file)
  @logger.debug("Queue for upload", :path => temp_file.path)

  # if the queue is full the calling thread will be used to upload
  temp_file.close # make sure the content is on disk
  if temp_file.size > 0
    @uploader.upload_async(temp_file,
                           :on_complete => method(:clean_temporary_file),
                           :upload_options => upload_options )
  end
end