class S3MetaSync::Syncer
Constants
- AWS_PRIVATE_ACCESS
- AWS_PUBLIC_ACCESS
- DEFAULT_REGION
- STAGING_AREA_PREFIX
Public Class Methods
new(config)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 23 def initialize(config) @config = { acl: AWS_PUBLIC_ACCESS, region: DEFAULT_REGION }.merge(config) end
Private Class Methods
swap_in_directory(destination, dir)
click to toggle source
almost atomic when destination and temp dir are not on the same device
# File lib/s3_meta_sync/syncer.rb, line 121 def self.swap_in_directory(destination, dir) next_dir = "#{destination}-next" delete = "#{destination}-delete" # clean up potential leftovers from last run FileUtils.remove_dir(next_dir) if File.exist?(next_dir) FileUtils.remove_dir(delete) if File.exist?(delete) # move onto the same device FileUtils.mv(dir, next_dir) # copy permissions FileUtils.chmod_R(File.stat(destination).mode, next_dir) # swap FileUtils.mv(destination, delete) FileUtils.mv(next_dir, destination) # cleanup old FileUtils.remove_dir(delete) end
Public Instance Methods
sync(source, destination)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 30 def sync(source, destination) raise ArgumentError if source.end_with?("/") or destination.end_with?("/") if destination.include?(":") @bucket, destination = destination.split(":") upload(source, destination) else if url?(source) @bucket = nil source = source else @bucket, source = source.split(":") end download(source, destination) end end
Private Instance Methods
consume_corrupted_files(source)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 155 def consume_corrupted_files(source) log = "#{source}/#{CORRUPT_FILES_LOG}" if File.exist?(log) corrupted = File.read(log).split("\n") log "force uploading #{corrupted.size} corrupted files", true File.unlink log corrupted else [] end end
copy_content(destination, dir)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 115 def copy_content(destination, dir) log "Copying content from #{destination} to #{dir}" system "cp -R #{destination}/* #{dir} 2>/dev/null" end
delete_empty_folders(destination)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 365 def delete_empty_folders(destination) log "Deleting empty folders" `find #{destination} -depth -empty -delete` end
delete_local_files(local, paths)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 201 def delete_local_files(local, paths) log "Delete #{paths.size} local files" if paths.size > 0 paths = paths.map { |path| "#{local}/#{path}" } paths.each { |path| log "Deleting #{path}" } FileUtils.rm_f(paths) end
delete_old_temp_folders()
click to toggle source
Sometimes SIGTERM causes Dir.mktmpdir to not properly delete the temp folder Remove 1 day old folders
# File lib/s3_meta_sync/syncer.rb, line 104 def delete_old_temp_folders path = File.join(Dir.tmpdir, STAGING_AREA_PREFIX + "*") day = 24 * 60 * 60 dirs = Dir.glob(path) dirs.select! { |dir| Time.now.utc - File.ctime(dir).utc > day } # only stale ones removed = dirs.each { |dir| FileUtils.rm_rf(dir) } log "Removed #{removed} old temp folder(s)" if removed.count > 0 end
delete_remote_files(remote, paths)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 186 def delete_remote_files(remote, paths) paths.each { |path| log "Deleting #{@bucket}:#{remote}/#{path}" } if paths.any? # keys are limited to 1000 per request: http://docs.aws.amazon.com/sdkforruby/api/Aws/S3/Bucket.html#delete_objects-instance_method paths.each_slice(1000) do |sliced_paths| log "Sending request for #{sliced_paths.size} keys" s3.delete_objects( delete: { objects: sliced_paths.map { |path| {key: "#{remote}/#{path}"} } }, request_payer: "requester", bucket: @bucket ) end end end
download(source, destination)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 72 def download(source, destination) delete_old_temp_folders remote_meta = download_meta(source) local_files = ((@config[:no_local_changes] && read_meta(destination)) || meta_data(destination))[:files] download = remote_meta[:files].select { |path, md5| local_files[path] != md5 }.map(&:first) delete = local_files.keys - remote_meta[:files].keys log "Downloading: #{download.size} Deleting: #{delete.size}", true if download.any? || delete.any? Dir.mktmpdir(STAGING_AREA_PREFIX) do |staging_area| log "Staging area: #{staging_area}" FileUtils.mkdir_p(destination) copy_content(destination, staging_area) download_files(source, staging_area, download, remote_meta[:zip]) delete_local_files(staging_area, delete) delete_empty_folders(staging_area) store_meta(staging_area, remote_meta) verify_integrity!(staging_area, destination, download, remote_meta[:files]) log "Swapping in directories #{destination} and #{staging_area}" self.class.swap_in_directory(destination, staging_area) FileUtils.mkdir(staging_area) # mktmpdir tries to remove this directory log "Download finished" end end end
download_content(path)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 328 def download_content(path) log "Downloading #{path}" url = if url?(path) path else "https://s3#{"-#{region}" if region}.amazonaws.com/#{@bucket}/#{path}" end options = (@config[:ssl_none] ? {:ssl_verify_mode => OpenSSL::SSL::VERIFY_NONE} : {}) options[:open_timeout] = @config.fetch(:open_timeout, 5) # 5 seconds options[:read_timeout] = @config.fetch(:read_timeout, 10) # 10 seconds retry_downloads(url: url) { open(url, options) } end
download_file(source, path, destination, zip)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 303 def download_file(source, path, destination, zip) download = if private? private_content_download(source, path) else public_content_download(source, path) end download = S3MetaSync::Zip.unzip(download) if zip FileUtils.mkdir_p(File.dirname("#{destination}/#{path}")) # consumes less ram then File.write(path, content), possibly also faster File.open("#{destination}/#{path}", "wb") { |f| IO.copy_stream(download, f) } download.close end
download_files(source, destination, paths, zip)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 370 def download_files(source, destination, paths, zip) in_multiple_threads(paths) do |path| download_file(source, path, destination, zip) end end
download_meta(destination)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 258 def download_meta(destination) if private? private_access_download_meta(destination) else public_access_download_meta(destination) end end
generate_meta(source)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 224 def generate_meta(source) store_meta(source, meta_data(source)) end
in_multiple_threads(data) { |slice| ... }
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 385 def in_multiple_threads(data) threads = [@config[:parallel] || 10, data.size].min data = data.dup (0...threads).to_a.map do Thread.new do while slice = data.shift yield slice end end end.each(&:join) end
log(text, important=false)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 401 def log(text, important=false) $stderr.puts text if @config[:verbose] or important end
md5_hash(source, files=nil)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 243 def md5_hash(source, files=nil) Dir.chdir(source) do files ||= Dir["**/*"].select { |f| File.file?(f) } Hash[files.map { |file| [file, Digest::MD5.file(file).to_s] }] end end
meta_data(source)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 236 def meta_data(source) return {files: {}} unless File.directory?(source) result = {files: md5_hash(source)} result[:zip] = @config[:zip] if @config[:zip] result end
parse_yaml_content(content)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 298 def parse_yaml_content(content) result = YAML.load(content) result.key?(:files) ? result : {files: result} # support new and old format end
private?()
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 405 def private? @config[:acl] == AWS_PRIVATE_ACCESS end
private_access_download_meta(destination)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 266 def private_access_download_meta(destination) content = private_content_download(destination, META_FILE).string raise S3MetaSync::RemoteWithoutMeta if content.empty? # if missing, upload everything parse_yaml_content(content) rescue Aws::S3::Errors::NoSuchKey, Aws::S3::Errors::AccessDenied # if requesting a file that doesn't exist AccessDenied is raised retries ||= 0 raise S3MetaSync::RemoteWithoutMeta if retries >= 1 retries += 1 sleep 1 # maybe the remote meta was just updated ... give aws a second chance ... retry end
private_content_download(source, path)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 318 def private_content_download(source, path) log "Downloading #{path}" obj = s3.get_object(bucket: @bucket, key: "#{source}/#{path}") obj.body end
public_access_download_meta(destination)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 282 def public_access_download_meta(destination) content = download_content("#{destination}/#{META_FILE}") { |io| io.read } raise OpenURI::HTTPError.new("Content is empty", nil) if content.size == 0 parse_yaml_content(content) rescue OpenURI::HTTPError retries ||= 0 raise S3MetaSync::RemoteWithoutMeta if retries >= 1 retries += 1 sleep 1 # maybe the remote meta was just updated ... give aws a second chance ... retry end
public_content_download(source, path)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 324 def public_content_download(source, path) download_content("#{source}/#{path}") # warning: using block form consumes more ram end
read_meta(source)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 250 def read_meta(source) file = "#{source}/#{META_FILE}" if File.exist?(file) content = File.read(file) parse_yaml_content(content) if content.size > 0 end end
region()
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 380 def region region = @config[:region] region if region != DEFAULT_REGION end
retry_downloads(url:) { || ... }
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 342 def retry_downloads(url:) yield rescue OpenURI::HTTPError, Errno::ECONNRESET, Errno::ETIMEDOUT, Net::OpenTimeout, Net::ReadTimeout => e max_retries = @config[:max_retries] || 2 http_error_retries ||= 0 http_error_retries += 1 if http_error_retries <= max_retries log "#{e.class} error downloading #{url}, retrying #{http_error_retries}/#{max_retries}" retry else raise $!, "#{$!.message} -- while trying to download #{url}", $!.backtrace end rescue OpenSSL::SSL::SSLError ssl_error_retries ||= 0 ssl_error_retries += 1 if ssl_error_retries == 1 log "SSL error downloading #{url}, retrying" retry else raise end end
s3()
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 208 def s3 @s3 ||= begin config = { region: @config[:region] } if @config[:credentials_path] config[:credentials] = Aws::SharedCredentials.new(path: @config[:credentials_path], profile_name: "default") else config[:access_key_id] = @config[:key] config[:secret_access_key] = @config[:secret] config[:session_token] = @config[:session_token] if @config[:session_token] end Aws::S3::Client.new(config) end end
store_meta(source, meta)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 228 def store_meta(source, meta) log "Storing meta file" file = "#{source}/#{META_FILE}" FileUtils.mkdir_p(File.dirname(file)) File.write(file, meta.to_yaml) meta end
upload(source, destination)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 49 def upload(source, destination) corrupted = consume_corrupted_files(source) remote_meta = begin download_meta(destination) rescue RemoteWithoutMeta log "Remote has no #{META_FILE}, uploading everything", true {files: {}} end local_files = generate_meta(source)[:files] remote_files = remote_meta[:files] upload = if @config[:zip] == remote_meta[:zip] local_files.select { |path, md5| remote_files[path] != md5 || corrupted.include?(path) } else local_files end.map(&:first) delete = remote_files.keys - local_files.keys log "Uploading: #{upload.size} Deleting: #{delete.size}", true upload_files(source, destination, upload) delete_remote_files(destination, delete) upload_file(source, META_FILE, destination) end
upload_file(source, path, destination)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 167 def upload_file(source, path, destination) log "Uploading #{path}" content = File.read("#{source}/#{path}") content = Zip.zip(content) if @config[:zip] && path != META_FILE object = { acl: @config[:acl], bucket: @bucket, body: content, content_encoding: content.encoding.to_s, content_type: MIME::Types.of(path).first.to_s, key: "#{destination}/#{path}" } object[:server_side_encryption] = @config[:server_side_encryption] if @config[:server_side_encryption] s3.put_object(object) end
upload_files(source, destination, paths)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 376 def upload_files(source, destination, paths) in_multiple_threads(paths) { |path| upload_file(source, path, destination) } end
url?(source)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 397 def url?(source) source.include?("://") end
verify_integrity!(staging_area, destination, changed, remote)
click to toggle source
# File lib/s3_meta_sync/syncer.rb, line 143 def verify_integrity!(staging_area, destination, changed, remote) log "Verifying integrity of #{changed.size} files" if changed.size > 0 local = md5_hash(staging_area, changed) corrupted = local.select { |file, md5| remote[file] != md5 }.map(&:first) return if corrupted.empty? File.write("#{destination}/#{CORRUPT_FILES_LOG}", corrupted.join("\n")) message = "corrupted files downloaded:\n#{corrupted.join("\n")}" log message, true raise RemoteCorrupt, message end