module IntelligentUtils

Public Instance Methods

bad_state(state) click to toggle source

Check if state is in error state or has reached maximum retries

@param [IntelligentState] state An IntelligentState object

@return [Boolean]

# File lib/filestack/utils/utils.rb, line 180
def bad_state(state)
  !state.ok && state.alive?
end
change_offset(working_offset, state) click to toggle source

Return current working offest if state has not tried it. Otherwise, return the next offset of the state

@param [Integer] working_offset The current offset @param [IntelligentState] state An IntelligentState object

@return [Integer]

# File lib/filestack/utils/utils.rb, line 192
def change_offset(working_offset, state)
  if state.offset > working_offset
    working_offset
  else
    state.offset = state.next_offset
  end
end
chunk_job(job, state, apikey, filename, filesize, start_response, storage) click to toggle source

Chunk a specific job into offests

@param [Dict] job Dictionary with all job options @param [IntelligentState] state An IntelligentState object @param [String] apikey Filestack API key @param [String] filename Name of incoming file @param [String] filepath Local path to the file @param [Int] filesize Size of incoming file @param [Typhoeus::Response] start_response Response body from

multipart_start

@return [Dict]

# File lib/filestack/utils/utils.rb, line 282
def chunk_job(job, state, apikey, filename, filesize, start_response, storage)
  offset = 0
  seek_point = job[:seek_point]
  chunk_list = []

  while (offset < FilestackConfig::DEFAULT_CHUNK_SIZE) && (seek_point + offset) < filesize
    chunk_list.push(
      seek_point: seek_point,
      filename: filename,
      apikey: apikey,
      part: job[:part],
      size: job[:size],
      uri: start_response['uri'],
      region: start_response['region'],
      upload_id: start_response['upload_id'],
      location_url: start_response['location_url'],
      store: { location: storage },
      offset: offset
    )
    offset += state.offset
  end
  chunk_list
end
create_intelligent_generator(jobs) click to toggle source

Creates a generator of part jobs

@param [Array] jobs A list of file parts

@return [Fiber]

# File lib/filestack/utils/utils.rb, line 239
def create_intelligent_generator(jobs)
  jobs_gen = jobs.lazy.each
  Fiber.new do
    (jobs.length-1).times do
      Fiber.yield jobs_gen.next
    end
    jobs_gen.next
  end
end
create_upload_job_chunks(jobs, state, apikey, filename, filepath, filesize, start_response) click to toggle source

Loop and run chunks for each offset

@param [Array] jobs A list of file parts @param [IntelligentState] state An IntelligentState object @param [String] apikey Filestack API key @param [String] filename Name of incoming file @param [String] filepath Local path to the file @param [Int] filesize Size of incoming file @param [Typhoeus::Response] start_response Response body from

multipart_start

@return [Array]

# File lib/filestack/utils/utils.rb, line 261
def create_upload_job_chunks(jobs, state, apikey, filename, filepath, filesize, start_response)
  jobs.each { |job|
    job[:chunks] = chunk_job(
      job, state, apikey, filename, filepath, filesize, start_response
    )
  }
  jobs
end
get_generator_batch(generator) click to toggle source

Generates a batch given a Fiber

@param [Fiber] generator A living Fiber object

@return [Array]

# File lib/filestack/utils/utils.rb, line 166
def get_generator_batch(generator)
  batch = []
  4.times do
    batch.push(generator.resume) if generator.alive?
  end
  return batch
end
run_intelligent_upload_flow(jobs, filepath, io, state, storage) click to toggle source

Runs the intelligent upload flow, from start to finish

@param [Array] jobs A list of file parts @param [IntelligentState] state An IntelligentState object

@return [Array]

# File lib/filestack/utils/utils.rb, line 206
def run_intelligent_upload_flow(jobs, filepath, io, state, storage)
  bar = ProgressBar.new(jobs.length)
  generator = create_intelligent_generator(jobs)
  working_offset = FilestackConfig::DEFAULT_OFFSET_SIZE
  while generator.alive?
    batch = get_generator_batch(generator)
    # run parts
    Parallel.map(batch, in_threads: 4) do |part|
      state = run_intelligent_uploads(part, filepath, io, state, storage)
      # condition: a chunk has failed but we have not reached the maximum retries
      while bad_state(state)
        # condition: timeout to S3, requiring offset size to be changed
        if state.error_type == 'S3_NETWORK'
          sleep(5)
          state.offset = working_offset = change_offset(working_offset, state)
        # condition: timeout to backend, requiring only backoff
        elsif ['S3_SERVER', 'BACKEND_SERVER'].include? state.error_type
          sleep(state.backoff)
        end
        state.add_retry
        state = run_intelligent_uploads(part, filepath, io, state, storage)
      end
      raise "Upload has failed. Please try again later." unless state.ok
      bar.increment!
    end
  end
end
run_intelligent_uploads(part, filepath, io, state, storage) click to toggle source

Send a job's chunks in parallel and commit

@param [Dict] part A dictionary representing the information

for a single part

@param [IntelligentState] state An IntelligentState object

@return [IntelligentState]

# File lib/filestack/utils/utils.rb, line 313
def run_intelligent_uploads(part, filepath, io, state, storage)
  failed = false
  chunks = chunk_job(
    part, state, part[:apikey], part[:filename], part[:filesize], part[:start_response], storage
  )
  Parallel.map(chunks, in_threads: 3) do |chunk|
    begin
      upload_chunk_intelligently(chunk, state, part[:apikey], filepath, io, part[:options], storage)
    rescue => e
      state.error_type = e.message
      failed = true
      Parallel::Kill
    end
  end

  if failed
    state.ok = false
    return state
  else
    state.ok = true
  end

  commit_params = {
    apikey: part[:apikey],
    uri: part[:uri],
    region: part[:region],
    upload_id: part[:upload_id],
    size: part[:filesize],
    part: part[:part],
    location_url: part[:start_response]['location_url'],
    store: { location: storage }
  }

  response = Typhoeus.post(FilestackConfig.multipart_commit_url(commit_params[:location_url]),
                           body: commit_params.to_json,
                           headers: FilestackConfig::HEADERS)

  if response.code == 200
    state.reset
  else
    state.ok = false
  end
  state
end
upload_chunk_intelligently(job, state, apikey, filepath, io, options, storage) click to toggle source

Upload a single chunk

@param [Dict] job Dictionary with all job options @param [IntelligentState] state An IntelligentState object @param [String] apikey Filestack API key @param [String] filename Name of incoming file @param [String] filepath Local path to the file @param [Hash] options User-defined options for

multipart uploads

@return [Typhoeus::Response]

# File lib/filestack/utils/utils.rb, line 369
def upload_chunk_intelligently(job, state, apikey, filepath, io, options, storage)
  file = filepath ? File.open(filepath) : io
  file.seek(job[:seek_point] + job[:offset])

  chunk = file.read(state.offset)
  md5 = Digest::MD5.new
  md5 << chunk
  data = {
    apikey: apikey,
    part: job[:part],
    size: chunk.length,
    md5: md5.base64digest,
    uri: job[:uri],
    region: job[:region],
    upload_id: job[:upload_id],
    store: { location: storage },
    offset: job[:offset],
    fii: true
  }

  data = data.merge!(options) if options

  fs_response = Typhoeus.post(FilestackConfig.multipart_upload_url(job[:location_url]),
                              body: data.to_json,
                              headers: FilestackConfig::HEADERS)

  # POST to multipart/upload
  begin
    unless fs_response.code == 200
      if [400, 403, 404].include? fs_response.code
        raise 'FAILURE'
      else
        raise 'BACKEND_SERVER'
      end
    end

  rescue
    raise 'BACKEND_NETWORK'
  end
  fs_response = JSON.parse(fs_response.body)

  # PUT to S3
  begin
    amazon_response = Typhoeus.put(
      fs_response['url'], headers: fs_response['headers'], body: chunk
    )
    unless amazon_response.code == 200
      if [400, 403, 404].include? amazon_response.code
        raise 'FAILURE'
      else
        raise 'S3_SERVER'
      end
    end

  rescue
    raise 'S3_NETWORK'
  end
  amazon_response
end