module IntelligentUtils
Public Instance Methods
Check if state is in error state or has reached maximum retries
@param [IntelligentState] state An IntelligentState
object
@return [Boolean]
# File lib/filestack/utils/utils.rb, line 180 def bad_state(state) !state.ok && state.alive? end
Return current working offest if state has not tried it. Otherwise, return the next offset of the state
@param [Integer] working_offset The current offset @param [IntelligentState] state An IntelligentState
object
@return [Integer]
# File lib/filestack/utils/utils.rb, line 192 def change_offset(working_offset, state) if state.offset > working_offset working_offset else state.offset = state.next_offset end end
Chunk a specific job into offests
@param [Dict] job Dictionary with all job options @param [IntelligentState] state An IntelligentState
object @param [String] apikey Filestack
API key @param [String] filename Name of incoming file @param [String] filepath Local path to the file @param [Int] filesize Size of incoming file @param [Typhoeus::Response] start_response Response body from
multipart_start
@return [Dict]
# File lib/filestack/utils/utils.rb, line 282 def chunk_job(job, state, apikey, filename, filesize, start_response, storage) offset = 0 seek_point = job[:seek_point] chunk_list = [] while (offset < FilestackConfig::DEFAULT_CHUNK_SIZE) && (seek_point + offset) < filesize chunk_list.push( seek_point: seek_point, filename: filename, apikey: apikey, part: job[:part], size: job[:size], uri: start_response['uri'], region: start_response['region'], upload_id: start_response['upload_id'], location_url: start_response['location_url'], store: { location: storage }, offset: offset ) offset += state.offset end chunk_list end
Creates a generator of part jobs
@param [Array] jobs A list of file parts
@return [Fiber]
# File lib/filestack/utils/utils.rb, line 239 def create_intelligent_generator(jobs) jobs_gen = jobs.lazy.each Fiber.new do (jobs.length-1).times do Fiber.yield jobs_gen.next end jobs_gen.next end end
Loop and run chunks for each offset
@param [Array] jobs A list of file parts @param [IntelligentState] state An IntelligentState
object @param [String] apikey Filestack
API key @param [String] filename Name of incoming file @param [String] filepath Local path to the file @param [Int] filesize Size of incoming file @param [Typhoeus::Response] start_response Response body from
multipart_start
@return [Array]
# File lib/filestack/utils/utils.rb, line 261 def create_upload_job_chunks(jobs, state, apikey, filename, filepath, filesize, start_response) jobs.each { |job| job[:chunks] = chunk_job( job, state, apikey, filename, filepath, filesize, start_response ) } jobs end
Generates a batch given a Fiber
@param [Fiber] generator A living Fiber object
@return [Array]
# File lib/filestack/utils/utils.rb, line 166 def get_generator_batch(generator) batch = [] 4.times do batch.push(generator.resume) if generator.alive? end return batch end
Runs the intelligent upload flow, from start to finish
@param [Array] jobs A list of file parts @param [IntelligentState] state An IntelligentState
object
@return [Array]
# File lib/filestack/utils/utils.rb, line 206 def run_intelligent_upload_flow(jobs, filepath, io, state, storage) bar = ProgressBar.new(jobs.length) generator = create_intelligent_generator(jobs) working_offset = FilestackConfig::DEFAULT_OFFSET_SIZE while generator.alive? batch = get_generator_batch(generator) # run parts Parallel.map(batch, in_threads: 4) do |part| state = run_intelligent_uploads(part, filepath, io, state, storage) # condition: a chunk has failed but we have not reached the maximum retries while bad_state(state) # condition: timeout to S3, requiring offset size to be changed if state.error_type == 'S3_NETWORK' sleep(5) state.offset = working_offset = change_offset(working_offset, state) # condition: timeout to backend, requiring only backoff elsif ['S3_SERVER', 'BACKEND_SERVER'].include? state.error_type sleep(state.backoff) end state.add_retry state = run_intelligent_uploads(part, filepath, io, state, storage) end raise "Upload has failed. Please try again later." unless state.ok bar.increment! end end end
Send a job's chunks in parallel and commit
@param [Dict] part A dictionary representing the information
for a single part
@param [IntelligentState] state An IntelligentState
object
@return [IntelligentState]
# File lib/filestack/utils/utils.rb, line 313 def run_intelligent_uploads(part, filepath, io, state, storage) failed = false chunks = chunk_job( part, state, part[:apikey], part[:filename], part[:filesize], part[:start_response], storage ) Parallel.map(chunks, in_threads: 3) do |chunk| begin upload_chunk_intelligently(chunk, state, part[:apikey], filepath, io, part[:options], storage) rescue => e state.error_type = e.message failed = true Parallel::Kill end end if failed state.ok = false return state else state.ok = true end commit_params = { apikey: part[:apikey], uri: part[:uri], region: part[:region], upload_id: part[:upload_id], size: part[:filesize], part: part[:part], location_url: part[:start_response]['location_url'], store: { location: storage } } response = Typhoeus.post(FilestackConfig.multipart_commit_url(commit_params[:location_url]), body: commit_params.to_json, headers: FilestackConfig::HEADERS) if response.code == 200 state.reset else state.ok = false end state end
Upload a single chunk
@param [Dict] job Dictionary with all job options @param [IntelligentState] state An IntelligentState
object @param [String] apikey Filestack
API key @param [String] filename Name of incoming file @param [String] filepath Local path to the file @param [Hash] options User-defined options for
multipart uploads
@return [Typhoeus::Response]
# File lib/filestack/utils/utils.rb, line 369 def upload_chunk_intelligently(job, state, apikey, filepath, io, options, storage) file = filepath ? File.open(filepath) : io file.seek(job[:seek_point] + job[:offset]) chunk = file.read(state.offset) md5 = Digest::MD5.new md5 << chunk data = { apikey: apikey, part: job[:part], size: chunk.length, md5: md5.base64digest, uri: job[:uri], region: job[:region], upload_id: job[:upload_id], store: { location: storage }, offset: job[:offset], fii: true } data = data.merge!(options) if options fs_response = Typhoeus.post(FilestackConfig.multipart_upload_url(job[:location_url]), body: data.to_json, headers: FilestackConfig::HEADERS) # POST to multipart/upload begin unless fs_response.code == 200 if [400, 403, 404].include? fs_response.code raise 'FAILURE' else raise 'BACKEND_SERVER' end end rescue raise 'BACKEND_NETWORK' end fs_response = JSON.parse(fs_response.body) # PUT to S3 begin amazon_response = Typhoeus.put( fs_response['url'], headers: fs_response['headers'], body: chunk ) unless amazon_response.code == 200 if [400, 403, 404].include? amazon_response.code raise 'FAILURE' else raise 'S3_SERVER' end end rescue raise 'S3_NETWORK' end amazon_response end