class Backup::CloudIO::S3

Constants

MAX_FILE_SIZE
MAX_MULTIPART_SIZE

Attributes

access_key_id[R]
bucket[R]
chunk_size[R]
encryption[R]
fog_options[R]
region[R]
secret_access_key[R]
storage_class[R]
use_iam_profile[R]

Public Class Methods

new(options = {}) click to toggle source
Calls superclass method Backup::CloudIO::Base::new
# File lib/backup/cloud_io/s3.rb, line 19
def initialize(options = {})
  super

  @access_key_id      = options[:access_key_id]
  @secret_access_key  = options[:secret_access_key]
  @use_iam_profile    = options[:use_iam_profile]
  @region             = options[:region]
  @bucket             = options[:bucket]
  @chunk_size         = options[:chunk_size]
  @encryption         = options[:encryption]
  @storage_class      = options[:storage_class]
  @fog_options        = options[:fog_options]
end

Public Instance Methods

delete(objects_or_keys) click to toggle source

Delete object(s) from the bucket.

  • Called by the Storage (with objects) and the Syncer (with keys)

  • Deletes 1000 objects per request.

  • Missing objects will be ignored.

# File lib/backup/cloud_io/s3.rb, line 100
def delete(objects_or_keys)
  keys = Array(objects_or_keys).dup
  keys.map!(&:key) if keys.first.is_a?(Object)

  opts = { quiet: true } # only report Errors in DeleteResult
  until keys.empty?
    keys_partial = keys.slice!(0, 1000)
    with_retries("DELETE Multiple Objects") do
      resp = connection.delete_multiple_objects(bucket, keys_partial, opts.dup)
      unless resp.body["DeleteResult"].empty?
        errors = resp.body["DeleteResult"].map do |result|
          error = result["Error"]
          "Failed to delete: #{error["Key"]}\n" \
            "Reason: #{error["Code"]}: #{error["Message"]}"
        end.join("\n")
        raise Error, "The server returned the following:\n#{errors}"
      end
    end
  end
end
head_object(object) click to toggle source

Used by Object to fetch metadata if needed.

# File lib/backup/cloud_io/s3.rb, line 87
def head_object(object)
  resp = nil
  with_retries("HEAD '#{bucket}/#{object.key}'") do
    resp = connection.head_object(bucket, object.key)
  end
  resp
end
objects(prefix) click to toggle source

Returns all objects in the bucket with the given prefix.

  • get_bucket returns a max of 1000 objects per request.

  • Returns objects in alphabetical order.

  • If marker is given, only objects after the marker are in the response.

# File lib/backup/cloud_io/s3.rb, line 67
def objects(prefix)
  objects = []
  resp = nil
  prefix = prefix.chomp("/")
  opts = { "prefix" => prefix + "/" }

  while resp.nil? || resp.body["IsTruncated"]
    opts["marker"] = objects.last.key unless objects.empty?
    with_retries("GET '#{bucket}/#{prefix}/*'") do
      resp = connection.get_bucket(bucket, opts)
    end
    resp.body["Contents"].each do |obj_data|
      objects << Object.new(self, obj_data)
    end
  end

  objects
end
upload(src, dest) click to toggle source

The Syncer may call this method in multiple threads. However, objects is always called prior to multithreading.

# File lib/backup/cloud_io/s3.rb, line 35
      def upload(src, dest)
        file_size = File.size(src)
        chunk_bytes = chunk_size * 1024**2
        if chunk_bytes > 0 && file_size > chunk_bytes
          raise FileSizeError, <<-EOS if file_size > MAX_MULTIPART_SIZE
            File Too Large
            File: #{src}
            Size: #{file_size}
            Max Multipart Upload Size is #{MAX_MULTIPART_SIZE} (5 TiB)
          EOS

          chunk_bytes = adjusted_chunk_bytes(chunk_bytes, file_size)
          upload_id = initiate_multipart(dest)
          parts = upload_parts(src, dest, upload_id, chunk_bytes, file_size)
          complete_multipart(dest, upload_id, parts)
        else
          raise FileSizeError, <<-EOS if file_size > MAX_FILE_SIZE
            File Too Large
            File: #{src}
            Size: #{file_size}
            Max File Size is #{MAX_FILE_SIZE} (5 GiB)
          EOS

          put_object(src, dest)
        end
      end

Private Instance Methods

adjusted_chunk_bytes(chunk_bytes, file_size) click to toggle source
# File lib/backup/cloud_io/s3.rb, line 215
      def adjusted_chunk_bytes(chunk_bytes, file_size)
        return chunk_bytes if file_size / chunk_bytes.to_f <= 10_000

        mb = orig_mb = chunk_bytes / 1024**2
        mb += 1 until file_size / (1024**2 * mb).to_f <= 10_000
        Logger.warn Error.new(<<-EOS)
          Chunk Size Adjusted
          Your original #chunk_size of #{orig_mb} MiB has been adjusted
          to #{mb} MiB in order to satisfy the limit of 10,000 chunks.
          To enforce your chosen #chunk_size, you should use the Splitter.
          e.g. split_into_chunks_of #{mb * 10_000} (#chunk_size * 10_000)
        EOS
        1024**2 * mb
      end
complete_multipart(dest, upload_id, parts) click to toggle source
# File lib/backup/cloud_io/s3.rb, line 191
      def complete_multipart(dest, upload_id, parts)
        Logger.info "\s\sComplete Multipart '#{bucket}/#{dest}'"

        with_retries("POST '#{bucket}/#{dest}' (Complete)") do
          resp = connection.complete_multipart_upload(bucket, dest, upload_id, parts)
          raise Error, <<-EOS if resp.body["Code"]
            The server returned the following error:
            #{resp.body["Code"]}: #{resp.body["Message"]}
          EOS
        end
      end
connection() click to toggle source
# File lib/backup/cloud_io/s3.rb, line 123
def connection
  @connection ||=
    begin
      opts = { provider: "AWS", region: region }
      if use_iam_profile
        opts[:use_iam_profile] = true
      else
        opts[:aws_access_key_id] = access_key_id
        opts[:aws_secret_access_key] = secret_access_key
      end
      opts.merge!(fog_options || {})
      conn = Fog::Storage.new(opts)
      conn.sync_clock
      conn
    end
end
headers() click to toggle source
# File lib/backup/cloud_io/s3.rb, line 203
def headers
  headers = {}

  enc = encryption.to_s.upcase
  headers["x-amz-server-side-encryption"] = enc unless enc.empty?

  sc = storage_class.to_s.upcase
  headers["x-amz-storage-class"] = sc unless sc.empty? || sc == "STANDARD"

  headers
end
initiate_multipart(dest) click to toggle source
# File lib/backup/cloud_io/s3.rb, line 150
def initiate_multipart(dest)
  Logger.info "\s\sInitiate Multipart '#{bucket}/#{dest}'"

  resp = nil
  with_retries("POST '#{bucket}/#{dest}' (Initiate)") do
    resp = connection.initiate_multipart_upload(bucket, dest, headers)
  end
  resp.body["UploadId"]
end
put_object(src, dest) click to toggle source
# File lib/backup/cloud_io/s3.rb, line 140
def put_object(src, dest)
  md5 = Base64.encode64(Digest::MD5.file(src).digest).chomp
  options = headers.merge("Content-MD5" => md5)
  with_retries("PUT '#{bucket}/#{dest}'") do
    File.open(src, "r") do |file|
      connection.put_object(bucket, dest, file, options)
    end
  end
end
upload_parts(src, dest, upload_id, chunk_bytes, file_size) click to toggle source

Each part’s MD5 is sent to verify the transfer. AWS will concatenate all parts into a single object once the multipart upload is completed.

# File lib/backup/cloud_io/s3.rb, line 163
def upload_parts(src, dest, upload_id, chunk_bytes, file_size)
  total_parts = (file_size / chunk_bytes.to_f).ceil
  progress = (0.1..0.9).step(0.1).map { |n| (total_parts * n).floor }
  Logger.info "\s\sUploading #{total_parts} Parts..."

  parts = []
  File.open(src, "r") do |file|
    part_number = 0
    while data = file.read(chunk_bytes)
      part_number += 1
      md5 = Base64.encode64(Digest::MD5.digest(data)).chomp

      with_retries("PUT '#{bucket}/#{dest}' Part ##{part_number}") do
        resp = connection.upload_part(
          bucket, dest, upload_id, part_number, StringIO.new(data),
          "Content-MD5" => md5
        )
        parts << resp.headers["ETag"]
      end

      if i = progress.rindex(part_number)
        Logger.info "\s\s...#{i + 1}0% Complete..."
      end
    end
  end
  parts
end