class AliyunSDK::OSS::Multipart::Upload
A multipart upload transaction
Constants
- NUM_THREAD
- PART_SIZE
- READ_SIZE
Public Class Methods
new(protocol, opts)
click to toggle source
Calls superclass method
AliyunSDK::OSS::Multipart::Transaction::new
# File lib/aliyun_sdk/oss/upload.rb, line 16 def initialize(protocol, opts) args = opts.dup @protocol = protocol @progress = args.delete(:progress) @file = args.delete(:file) @cpt_file = args.delete(:cpt_file) super(args) @file_meta = {} @num_threads = options[:threads] || NUM_THREAD @all_mutex = Mutex.new @parts = [] @todo_mutex = Mutex.new @todo_parts = [] end
Public Instance Methods
checkpoint()
click to toggle source
Checkpoint structures: @example
states = { :id => 'upload_id', :file => 'file', :file_meta => { :mtime => Time.now, :md5 => 1024 }, :parts => [ {:number => 1, :range => [0, 100], :done => false}, {:number => 2, :range => [100, 200], :done => true} ], :md5 => 'states_md5' }
# File lib/aliyun_sdk/oss/upload.rb, line 83 def checkpoint logger.debug("Begin make checkpoint, disable_cpt: "\ "#{options[:disable_cpt] == true}") ensure_file_not_changed parts = sync_get_all_parts states = { :id => id, :file => @file, :file_meta => @file_meta, :parts => parts } # report progress if @progress done = parts.count { |p| p[:done] } @progress.call(done.to_f / parts.size) if done > 0 end write_checkpoint(states, @cpt_file) unless options[:disable_cpt] logger.debug("Done make checkpoint, states: #{states}") end
run()
click to toggle source
Run the upload transaction, which includes 3 stages:
-
1a. initiate(new upload) and divide parts
-
1b. rebuild states(resumed upload)
-
upload each unfinished part
-
-
commit the multipart upload transaction
-
# File lib/aliyun_sdk/oss/upload.rb, line 37 def run logger.info("Begin upload, file: #{@file}, "\ "checkpoint file: #{@cpt_file}, "\ "threads: #{@num_threads}") # Rebuild transaction states from checkpoint file # Or initiate new transaction states rebuild # Divide the file to upload into parts to upload separately divide_parts if @parts.empty? # Upload each part @todo_parts = @parts.reject { |p| p[:done] } (1..@num_threads).map { Thread.new { loop { p = sync_get_todo_part break unless p upload_part(p) } } }.map(&:join) # Commit the multipart upload transaction commit logger.info("Done upload, file: #{@file}") end
Private Instance Methods
commit()
click to toggle source
Commit the transaction when all parts are succefully uploaded @todo handle undefined behaviors: commit succeeds in server
but return error in client
# File lib/aliyun_sdk/oss/upload.rb, line 112 def commit logger.info("Begin commit transaction, id: #{id}") parts = sync_get_all_parts.map{ |p| Part.new(:number => p[:number], :etag => p[:etag]) } @protocol.complete_multipart_upload( bucket, object, id, parts, @options[:callback]) File.delete(@cpt_file) unless options[:disable_cpt] logger.info("Done commit transaction, id: #{id}") end
divide_parts()
click to toggle source
Devide the file into parts to upload
# File lib/aliyun_sdk/oss/upload.rb, line 187 def divide_parts logger.info("Begin divide parts, file: #{@file}") max_parts = 10000 file_size = File.size(@file) part_size = [@options[:part_size] || PART_SIZE, file_size / max_parts].max num_parts = (file_size - 1) / part_size + 1 @parts = (1..num_parts).map do |i| { :number => i, :range => [(i-1) * part_size, [i * part_size, file_size].min], :done => false } end checkpoint logger.info("Done divide parts, parts: #{@parts}") end
ensure_file_not_changed()
click to toggle source
Ensure file not changed during uploading
# File lib/aliyun_sdk/oss/upload.rb, line 226 def ensure_file_not_changed return if File.mtime(@file) == @file_meta[:mtime] if @file_meta[:md5] != get_file_md5(@file) fail FileInconsistentError, "The file to upload is changed." end end
initiate()
click to toggle source
# File lib/aliyun_sdk/oss/upload.rb, line 147 def initiate logger.info("Begin initiate transaction") @id = @protocol.initiate_multipart_upload(bucket, object, options) @file_meta = { :mtime => File.mtime(@file), :md5 => get_file_md5(@file) } checkpoint logger.info("Done initiate transaction, id: #{id}") end
rebuild()
click to toggle source
Rebuild the states of the transaction from checkpoint file
# File lib/aliyun_sdk/oss/upload.rb, line 127 def rebuild logger.info("Begin rebuild transaction, checkpoint: #{@cpt_file}") if options[:disable_cpt] || !File.exists?(@cpt_file) initiate else states = load_checkpoint(@cpt_file) if states[:file_md5] != @file_meta[:md5] fail FileInconsistentError.new("The file to upload is changed.") end @id = states[:id] @file_meta = states[:file_meta] @parts = states[:parts] end logger.info("Done rebuild transaction, states: #{states}") end
sync_get_all_parts()
click to toggle source
# File lib/aliyun_sdk/oss/upload.rb, line 219 def sync_get_all_parts @all_mutex.synchronize { @parts.dup } end
sync_get_todo_part()
click to toggle source
# File lib/aliyun_sdk/oss/upload.rb, line 207 def sync_get_todo_part @todo_mutex.synchronize { @todo_parts.shift } end
sync_update_part(p)
click to toggle source
# File lib/aliyun_sdk/oss/upload.rb, line 213 def sync_update_part(p) @all_mutex.synchronize { @parts[p[:number] - 1] = p } end
upload_part(p)
click to toggle source
Upload
a part
# File lib/aliyun_sdk/oss/upload.rb, line 161 def upload_part(p) logger.debug("Begin upload part: #{p}") result = nil File.open(@file) do |f| range = p[:range] pos = range.first f.seek(pos) result = @protocol.upload_part(bucket, object, id, p[:number]) do |sw| while pos < range.at(1) bytes = [READ_SIZE, range.at(1) - pos].min sw << f.read(bytes) pos += bytes end end end sync_update_part(p.merge(done: true, etag: result.etag)) checkpoint logger.debug("Done upload part: #{p}") end