class AliyunSDK::OSS::Multipart::Download

A multipart download transaction

Constants

NUM_THREAD
PART_SIZE
READ_SIZE

Public Class Methods

new(protocol, opts) click to toggle source
Calls superclass method
# File lib/aliyun_sdk/oss/download.rb, line 16
def initialize(protocol, opts)
  args = opts.dup
  @protocol = protocol
  @progress = args.delete(:progress)
  @file = args.delete(:file)
  @cpt_file = args.delete(:cpt_file)
  super(args)

  @object_meta = {}
  @num_threads = options[:threads] || NUM_THREAD
  @all_mutex = Mutex.new
  @parts = []
  @todo_mutex = Mutex.new
  @todo_parts = []
end

Public Instance Methods

checkpoint() click to toggle source

Checkpoint structures: @example

states = {
  :id => 'download_id',
  :file => 'file',
  :object_meta => {
    :etag => 'xxx',
    :size => 1024
  },
  :parts => [
    {:number => 1, :range => [0, 100], :md5 => 'xxx', :done => false},
    {:number => 2, :range => [100, 200], :md5 => 'yyy', :done => true}
  ],
  :md5 => 'states_md5'
}
# File lib/aliyun_sdk/oss/download.rb, line 83
def checkpoint
  logger.debug("Begin make checkpoint, disable_cpt: "\
               "#{options[:disable_cpt] == true}")

  ensure_object_not_changed

  parts = sync_get_all_parts
  states = {
    :id => id,
    :file => @file,
    :object_meta => @object_meta,
    :parts => parts
  }

  # report progress
  if @progress
    done = parts.count { |p| p[:done] }
    @progress.call(done.to_f / parts.size) if done > 0
  end

  write_checkpoint(states, @cpt_file) unless options[:disable_cpt]

  logger.debug("Done make checkpoint, states: #{states}")
end
run() click to toggle source

Run the download transaction, which includes 3 stages:

  • 1a. initiate(new downlaod) and divide parts

  • 1b. rebuild states(resumed download)

    1. download each unfinished part

    1. combine the downloaded parts into the final file

# File lib/aliyun_sdk/oss/download.rb, line 37
def run
  logger.info("Begin download, file: #{@file}, "\
              "checkpoint file: #{@cpt_file}, "\
              "threads: #{@num_threads}")

  # Rebuild transaction states from checkpoint file
  # Or initiate new transaction states
  rebuild

  # Divide the target object into parts to download by ranges
  divide_parts if @parts.empty?

  # Download each part(object range)
  @todo_parts = @parts.reject { |p| p[:done] }

  (1..@num_threads).map {
    Thread.new {
      loop {
        p = sync_get_todo_part
        break unless p
        download_part(p)
      }
    }
  }.map(&:join)

  # Combine the parts into the final file
  commit

  logger.info("Done download, file: #{@file}")
end

Private Instance Methods

commit() click to toggle source

Combine the downloaded parts into the final file @todo avoid copy all part files

# File lib/aliyun_sdk/oss/download.rb, line 111
def commit
  logger.info("Begin commit transaction, id: #{id}")

  parts = sync_get_all_parts
  # concat all part files into the target file
  File.open(@file, 'w') do |w|
    parts.sort{ |x, y| x[:number] <=> y[:number] }.each do |p|
      File.open(get_part_file(p)) do |r|
          w.write(r.read(READ_SIZE)) until r.eof?
      end
    end
  end

  File.delete(@cpt_file) unless options[:disable_cpt]
  parts.each{ |p| File.delete(get_part_file(p)) }

  logger.info("Done commit transaction, id: #{id}")
end
divide_parts() click to toggle source

Devide the object to download into parts to download

# File lib/aliyun_sdk/oss/download.rb, line 193
def divide_parts
  logger.info("Begin divide parts, object: #{@object}")

  max_parts = 100
  object_size = @object_meta[:size]
  part_size =
    [@options[:part_size] || PART_SIZE, object_size / max_parts].max
  num_parts = (object_size - 1) / part_size + 1
  @parts = (1..num_parts).map do |i|
    {
      :number => i,
      :range => [(i - 1) * part_size, [i * part_size, object_size].min],
      :done => false
    }
  end

  checkpoint

  logger.info("Done divide parts, parts: #{@parts}")
end
download_part(p) click to toggle source

Download a part

# File lib/aliyun_sdk/oss/download.rb, line 175
def download_part(p)
  logger.debug("Begin download part: #{p}")

  part_file = get_part_file(p)
  File.open(part_file, 'w') do |w|
    @protocol.get_object(
      bucket, object,
      @options.merge(range: p[:range])) { |chunk| w.write(chunk) }
  end

  sync_update_part(p.merge(done: true, md5: get_file_md5(part_file)))

  checkpoint

  logger.debug("Done download part: #{p}")
end
ensure_object_not_changed() click to toggle source

Ensure file not changed during uploading

# File lib/aliyun_sdk/oss/download.rb, line 233
def ensure_object_not_changed
  obj = @protocol.get_object_meta(bucket, object)
  unless obj.etag == @object_meta[:etag]
    fail ObjectInconsistentError,
         "The object to download is changed: #{object}."
  end
end
generate_download_id() click to toggle source

Generate a download id

# File lib/aliyun_sdk/oss/download.rb, line 242
def generate_download_id
  "download_#{bucket}_#{object}_#{Time.now.to_i}"
end
get_part_file(p) click to toggle source

Get name for part file

# File lib/aliyun_sdk/oss/download.rb, line 247
def get_part_file(p)
  "#{@file}.part.#{p[:number]}"
end
initiate() click to toggle source
# File lib/aliyun_sdk/oss/download.rb, line 160
def initiate
  logger.info("Begin initiate transaction")

  @id = generate_download_id
  obj = @protocol.get_object_meta(bucket, object)
  @object_meta = {
    :etag => obj.etag,
    :size => obj.size
  }
  checkpoint

  logger.info("Done initiate transaction, id: #{id}")
end
rebuild() click to toggle source

Rebuild the states of the transaction from checkpoint file

# File lib/aliyun_sdk/oss/download.rb, line 131
def rebuild
  logger.info("Begin rebuild transaction, checkpoint: #{@cpt_file}")

  if options[:disable_cpt] || !File.exists?(@cpt_file)
    initiate
  else
    states = load_checkpoint(@cpt_file)

    states[:parts].select{ |p| p[:done] }.each do |p|
      part_file = get_part_file(p)

      unless File.exist?(part_file)
        fail PartMissingError, "The part file is missing: #{part_file}."
      end

      if p[:md5] != get_file_md5(part_file)
        fail PartInconsistentError,
             "The part file is changed: #{part_file}."
      end
    end

    @id = states[:id]
    @object_meta = states[:object_meta]
    @parts = states[:parts]
  end

  logger.info("Done rebuild transaction, states: #{states}")
end
sync_get_all_parts() click to toggle source
# File lib/aliyun_sdk/oss/download.rb, line 226
def sync_get_all_parts
  @all_mutex.synchronize {
    @parts.dup
  }
end
sync_get_todo_part() click to toggle source
# File lib/aliyun_sdk/oss/download.rb, line 214
def sync_get_todo_part
  @todo_mutex.synchronize {
    @todo_parts.shift
  }
end
sync_update_part(p) click to toggle source
# File lib/aliyun_sdk/oss/download.rb, line 220
def sync_update_part(p)
  @all_mutex.synchronize {
    @parts[p[:number] - 1] = p
  }
end