module Sluice::Storage::S3
Constants
- CONCURRENCY
Constants
- FogFile
- FogStorage
Aliases for Contracts
- RETRIES
- RETRY_WAIT
- TIMEOUT_WAIT
Public Class Methods
Copies files between S3
locations concurrently
Parameters:
s3
-
A Fog::Storage s3 connection
from_files_or_loc
-
Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to copy files from
to_location
-
S3Location to copy files to
match_regex
-
a regex string to match the files to copy
alter_filename_lambda
-
lambda to alter the written filename
flatten
-
strips off any sub-folders below the from_location
# File lib/sluice/storage/s3/s3.rb, line 198 def self.copy_files(s3, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=nil, flatten=false) puts " copying #{describe_from(from_files_or_loc)} to #{to_location}" process_files(:copy, s3, from_files_or_loc, [], match_regex, to_location, alter_filename_lambda, flatten) end
Copies files between S3
locations in two different accounts
Implementation is as follows:
-
Concurrent download of all files from
S3
source to local tmpdir -
Concurrent upload of all files from local tmpdir to
S3
target
In other words, the download and upload are not interleaved (which is inefficient because upload speeds are much lower than download speeds)
In other words, the download and upload are not interleaved (which is inefficient because upload speeds are much lower than download speeds)
from_s3
-
A Fog::Storage s3 connection for accessing the from S3Location
to_s3
-
A Fog::Storage s3 connection for accessing the to S3Location
from_location
-
S3Location to copy files from
to_location
-
S3Location to copy files to
match_regex
-
a regex string to match the files to move
alter_filename_lambda
-
lambda to alter the written filename
flatten
-
strips off any sub-folders below the from_location
# File lib/sluice/storage/s3/s3.rb, line 176 def self.copy_files_inter(from_s3, to_s3, from_location, to_location, match_regex='.+', alter_filename_lambda=nil, flatten=false) puts " copying inter-account #{describe_from(from_location)} to #{to_location}" processed = [] Dir.mktmpdir do |t| tmp = Sluice::Storage.trail_slash(t) processed = download_files(from_s3, from_location, tmp, match_regex) upload_files(to_s3, tmp, to_location, '**/*') # Upload all files we downloaded end processed end
Copies files between S3
locations maintaining a manifest to avoid copying a file which was copied previously.
Useful in scenarios such as:
-
You would like to do a move but only have read permission on the source bucket
-
You would like to do a move but some other process needs to use the files after you
s3
-
A Fog::Storage s3 connection
manifest
-
A
Sluice::Storage::S3::Manifest
object from_files_or_loc
-
Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to copy files from
to_location
-
S3Location to copy files to
match_regex
-
a regex string to match the files to copy
alter_filename_lambda
-
lambda to alter the written filename
flatten
-
strips off any sub-folders below the from_location
# File lib/sluice/storage/s3/s3.rb, line 220 def self.copy_files_manifest(s3, manifest, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=nil, flatten=false) puts " copying with manifest #{describe_from(from_files_or_loc)} to #{to_location}" ignore = manifest.get_entries(s3) # Files to leave untouched processed = process_files(:copy, s3, from_files_or_loc, ignore, match_regex, to_location, alter_filename_lambda, flatten) manifest.add_entries(s3, processed) processed end
Delete files from S3
locations concurrently
Parameters:
s3
-
A Fog::Storage s3 connection
from_files_or_loc
-
Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to delete files from
match_regex
-
a regex string to match the files to delete
# File lib/sluice/storage/s3/s3.rb, line 151 def self.delete_files(s3, from_files_or_loc, match_regex='.+') puts " deleting #{describe_from(from_files_or_loc)}" process_files(:delete, s3, from_files_or_loc, [], match_regex) end
Download a single file to the exact path specified Has no intelligence around filenaming. Makes sure to create the path as needed.
Parameters:
s3
-
A Fog::Storage s3 connection
- +from_file
-
A Fog::Storage::AWS::File to download
- +to_file
-
A local file path
# File lib/sluice/storage/s3/s3.rb, line 318 def self.download_file(s3, from_file, to_file) FileUtils.mkdir_p(File.dirname(to_file)) # TODO: deal with bug where Fog hangs indefinitely if network connection dies during download local_file = File.open(to_file, "w") local_file.write(from_file.body) local_file.close end
Download files from an S3
location to local storage, concurrently
Parameters:
s3
-
A Fog::Storage s3 connection
from_files_or_loc
-
Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to download files from
to_directory
-
Local directory to copy files to
match_regex
-
a regex string to match the files to delete
# File lib/sluice/storage/s3/s3.rb, line 139 def self.download_files(s3, from_files_or_loc, to_directory, match_regex='.+') puts " downloading #{describe_from(from_files_or_loc)} to #{to_directory}" process_files(:download, s3, from_files_or_loc, [], match_regex, to_directory) end
# File lib/sluice/storage/s3/s3.rb, line 108 def self.get_basename(path) if is_folder?(path) nil else match = path.match('([^/]+)$') if match match[1] else nil end end end
# File lib/sluice/storage/s3/s3.rb, line 127 def self.is_empty?(s3, location) list_files(s3, location).length == 0 end
# File lib/sluice/storage/s3/s3.rb, line 96 def self.is_file?(path) !is_folder?(path) end
# File lib/sluice/storage/s3/s3.rb, line 84 def self.is_folder?(path) (path.end_with?('_$folder$') || # EMR-created path.end_with?('/')) end
# File lib/sluice/storage/s3/s3.rb, line 65 def self.list_files(s3, location) files_and_dirs = s3.directories.get(location.bucket, prefix: location.dir_as_path).files files = [] # Can't use a .select because of Ruby deep copy issues (array of non-POROs) files_and_dirs.each { |f| if is_file?(f.key) files << f.dup end } files end
Moves files between S3
locations concurrently
Parameters:
s3
-
A Fog::Storage s3 connection
from_files_or_loc
-
Array of filepaths or Fog::Storage::AWS::File objects, or S3Location to move files from
to_location
-
S3Location to move files to
match_regex
-
a regex string to match the files to move
alter_filename_lambda
-
lambda to alter the written filename
flatten
-
strips off any sub-folders below the from_location
# File lib/sluice/storage/s3/s3.rb, line 270 def self.move_files(s3, from_files_or_loc, to_location, match_regex='.+', alter_filename_lambda=nil, flatten=false) puts " moving #{describe_from(from_files_or_loc)} to #{to_location}" process_files(:move, s3, from_files_or_loc, [], match_regex, to_location, alter_filename_lambda, flatten) end
Moves files between S3
locations in two different accounts
Implementation is as follows:
-
Concurrent download of all files from
S3
source to local tmpdir -
Concurrent upload of all files from local tmpdir to
S3
target -
Concurrent deletion of all files from
S3
source
In other words, the three operations are not interleaved (which is inefficient because upload speeds are much lower than download speeds)
from_s3
-
A Fog::Storage s3 connection for accessing the from S3Location
to_s3
-
A Fog::Storage s3 connection for accessing the to S3Location
from_location
-
S3Location to move files from
to_location
-
S3Location to move files to
match_regex
-
a regex string to match the files to move
alter_filename_lambda
-
lambda to alter the written filename
flatten
-
strips off any sub-folders below the from_location
# File lib/sluice/storage/s3/s3.rb, line 247 def self.move_files_inter(from_s3, to_s3, from_location, to_location, match_regex='.+', alter_filename_lambda=nil, flatten=false) puts " moving inter-account #{describe_from(from_location)} to #{to_location}" processed = [] Dir.mktmpdir do |t| tmp = Sluice::Storage.trail_slash(t) processed = download_files(from_s3, from_location, tmp, match_regex) upload_files(to_s3, tmp, to_location, '**/*') # Upload all files we downloaded delete_files(from_s3, from_location, '.+') # Delete all files we downloaded end processed end
# File lib/sluice/storage/s3/s3.rb, line 46 def self.new_fog_s3_from(region, access_key_id, secret_access_key) fog = Fog::Storage.new({ :provider => 'AWS', :region => region, :aws_access_key_id => access_key_id, :aws_secret_access_key => secret_access_key }) fog.sync_clock fog end
Upload a single file to the exact location specified Has no intelligence around filenaming.
Parameters:
s3
-
A Fog::Storage s3 connection
- +from_file
-
A local file path
- +to_bucket
-
The Fog::Directory to upload to
- +to_file
-
The file path to upload to
# File lib/sluice/storage/s3/s3.rb, line 297 def self.upload_file(s3, from_file, to_bucket, to_file) local_file = File.open(from_file) dir = s3.directories.new(:key => to_bucket) # No request made file = dir.files.create( :key => to_file, :body => local_file ) local_file.close end
Uploads files to S3
locations concurrently
Parameters:
s3
-
A Fog::Storage s3 connection
from_files_or_dir
-
Local array of files or local directory to upload files from
to_location
-
S3Location to upload files to
match_glob
-
a filesystem glob to match the files to upload
# File lib/sluice/storage/s3/s3.rb, line 283 def self.upload_files(s3, from_files_or_dir, to_location, match_glob='*') puts " uploading #{describe_from(from_files_or_dir)} to #{to_location}" process_files(:upload, s3, from_files_or_dir, [], match_glob, to_location) end
Private Class Methods
Provides string describing from_files_or_dir_or_loc for logging purposes.
Parameters:
from_files_or_dir_or_loc
-
Array of filepaths or Fog::Storage::AWS::File objects, local directory or S3Location to process files from
Returns a log-friendly string
# File lib/sluice/storage/s3/s3.rb, line 338 def self.describe_from(from_files_or_dir_or_loc) if from_files_or_dir_or_loc.is_a?(Array) "#{from_files_or_dir_or_loc.length} file(s)" else "files from #{from_files_or_dir_or_loc}" end end
A helper function to list all files recursively in a folder
Parameters:
dir
-
Directory to list files recursively
match_regex
-
a regex string to match the files to copy
Returns array of files (no sub-directories)
# File lib/sluice/storage/s3/s3.rb, line 603 def self.glob_files(dir, glob) Dir.glob(File.join(dir, glob)).select { |f| File.file?(f) # Drop sub-directories } end
A helper function to prepare destination filenames and paths. This is a bit weird
-
it needs to exist because of differences
in the way that Amazon S3
, Fog and Unix treat filepaths versus keys.
Parameters:
filepath
-
Path to file (including old filename)
new_filename
-
Replace the filename in the path with this
remove_path
-
If this is set, strip this from the front of the path
add_path
-
If this is set, add this to the front of the path
flatten
-
strips off any sub-folders below the from_location
TODO: this badly needs unit tests
# File lib/sluice/storage/s3/s3.rb, line 648 def self.name_file(filepath, new_filename, remove_path=nil, add_path=nil, flatten=false) # First, replace the filename in filepath with new one dirname = File.dirname(filepath) new_filepath = (dirname == '.') ? new_filename : dirname + '/' + new_filename # Nothing more to do return new_filepath if remove_path.nil? and add_path.nil? and not flatten shortened_filepath = if flatten # Let's revert to just the filename new_filename else # If we have a 'remove_path', it must be found at # the start of the path. # If it's not, you're probably using name_file() # wrong. if !filepath.start_with?(remove_path) raise StorageOperationError, "name_file failed. Filepath '#{filepath}' does not start with '#{remove_path}'" end # Okay, let's remove the filepath new_filepath[remove_path.length()..-1] end # Nothing more to do return shortened_filepath if add_path.nil? # Add the new filepath on to the start and return return add_path + shortened_filepath end
Concurrent file operations between S3
locations. Supports:
-
Download
-
Upload
-
Copy
-
Delete
-
Move (= Copy + Delete)
Parameters:
operation
-
Operation to perform. :download, :upload, :copy, :delete, :move supported
ignore
-
Array of filenames to ignore (used by manifest code)
s3
-
A Fog::Storage s3 connection
from_files_or_dir_or_loc
-
Array of filepaths or Fog::Storage::AWS::File objects, local directory or S3Location to process files from
match_regex_or_glob
-
a regex or glob string to match the files to process
to_loc_or_dir
-
S3Location or local directory to process files to
alter_filename_lambda
-
lambda to alter the written filename
flatten
-
strips off any sub-folders below the from_loc_or_dir
# File lib/sluice/storage/s3/s3.rb, line 362 def self.process_files(operation, s3, from_files_or_dir_or_loc, ignore=[], match_regex_or_glob='.+', to_loc_or_dir=nil, alter_filename_lambda=nil, flatten=false) # Validate that the file operation makes sense case operation when :copy, :move, :download, :upload if to_loc_or_dir.nil? raise StorageOperationError "File operation %s requires the to_loc_or_dir to be set" % operation end when :delete unless to_loc_or_dir.nil? raise StorageOperationError "File operation %s does not support the to_loc_or_dir argument" % operation end if alter_filename_lambda.class == Proc raise StorageOperationError "File operation %s does not support the alter_filename_lambda argument" % operation end else raise StorageOperationError "File operation %s is unsupported. Try :download, :upload, :copy, :delete or :move" % operation end # If we have an array of files, no additional globbing required if from_files_or_dir_or_loc.is_a?(Array) files_to_process = from_files_or_dir_or_loc # Could be filepaths or Fog::Storage::AWS::File's globbed = true # Otherwise if it's an upload, we can glob now elsif operation == :upload files_to_process = glob_files(from_files_or_dir_or_loc, match_regex_or_glob) globbed = true # Otherwise we'll do threaded globbing later... else files_to_process = [] from_loc = from_files_or_dir_or_loc # Alias globbed = false end threads = [] mutex = Mutex.new complete = false marker_opts = {} processed_files = [] # For manifest updating, determining if any files were moved etc # If an exception is thrown in a thread that isn't handled, die quickly Thread.abort_on_exception = true # Create Ruby threads to concurrently execute s3 operations for i in (0...CONCURRENCY) # Each thread pops a file off the files_to_process array, and moves it. # We loop until there are no more files threads << Thread.new(i) do |thread_idx| loop do file = false filepath = false from_bucket = false from_path = false match = false # First critical section: # only allow one thread to modify the array at any time mutex.synchronize do # No need to do further globbing if globbed if files_to_process.size == 0 complete = true next end file = files_to_process.pop # Support raw filenames and also Fog::Storage::AWS::File's if (file.is_a?(Fog::Storage::AWS::File)) from_bucket = file.directory.key # Bucket from_path = Sluice::Storage.trail_slash(File.dirname(file.key)) filepath = file.key else from_bucket = nil # Not used if from_files_or_dir_or_loc.is_a?(Array) from_path = Sluice::Storage.trail_slash(File.dirname(file)) else from_path = from_files_or_dir_or_loc # The root dir end filepath = file end match = true # Match is implicit in the glob else while !complete && !match do if files_to_process.size == 0 # S3 batches 1000 files per request. # We load up our array with the files to move files_to_process = s3.directories.get(from_loc.bucket, :prefix => from_loc.dir).files.all(marker_opts).to_a # If we don't have any files after the S3 request, we're complete if files_to_process.size == 0 complete = true next else marker_opts['marker'] = files_to_process.last.key # By reversing the array we can use pop and get FIFO behaviour # instead of the performance penalty incurred by unshift files_to_process = files_to_process.reverse end end file = files_to_process.pop from_bucket = from_loc.bucket from_path = from_loc.dir_as_path filepath = file.key # TODO: clean up following https://github.com/snowplow/sluice/issues/25 match = if match_regex_or_glob.is_a? Sluice::Storage::NegativeRegex !filepath.match(match_regex_or_glob.regex) else filepath.match(match_regex_or_glob) end end end end # End of mutex.synchronize # Kill this thread's loop (and thus this thread) if we are complete break if complete # Skip processing for a folder or file which doesn't match our regexp or glob next if is_folder?(filepath) or not match # Name file basename = get_basename(filepath) next if ignore.include?(basename) # Don't process if in our leave list filename = rename_file(filepath, basename, alter_filename_lambda) # What are we doing? Let's determine source and target # Note that target excludes bucket name where relevant case operation when :upload source = "#{filepath}" target = name_file(filepath, filename, from_path, to_loc_or_dir.dir_as_path, flatten) puts "(t#{thread_idx}) UPLOAD #{source} +-> #{to_loc_or_dir.bucket}/#{target}" when :download source = "#{from_bucket}/#{filepath}" target = name_file(filepath, filename, from_path, to_loc_or_dir, flatten) puts "(t#{thread_idx}) DOWNLOAD #{source} +-> #{target}" when :move source = "#{from_bucket}/#{filepath}" target = name_file(filepath, filename, from_path, to_loc_or_dir.dir_as_path, flatten) puts "(t#{thread_idx}) MOVE #{source} -> #{to_loc_or_dir.bucket}/#{target}" when :copy source = "#{from_bucket}/#{filepath}" target = name_file(filepath, filename, from_path, to_loc_or_dir.dir_as_path, flatten) puts "(t#{thread_idx}) COPY #{source} +-> #{to_loc_or_dir.bucket}/#{target}" when :delete source = "#{from_bucket}/#{filepath}" # No target puts "(t#{thread_idx}) DELETE x #{source}" end # Upload is a standalone operation vs move/copy/delete if operation == :upload retry_x( Sluice::Storage::S3, [:upload_file, s3, filepath, to_loc_or_dir.bucket, target], Sluice::Storage::S3::RETRIES, " +/> #{target}", "Problem uploading #{filepath}. Retrying.") end # Download is a standalone operation vs move/copy/delete if operation == :download retry_x( Sluice::Storage::S3, [:download_file, s3, file, target], Sluice::Storage::S3::RETRIES, " +/> #{target}", "Problem downloading #{filepath}. Retrying.") end # A move or copy starts with a copy file if [:move, :copy].include? operation retry_x( file, [:copy, to_loc_or_dir.bucket, target], Sluice::Storage::S3::RETRIES, " +-> #{to_loc_or_dir.bucket}/#{target}", "Problem copying #{filepath}. Retrying.") end # A move or delete ends with a delete if [:move, :delete].include? operation retry_x( file, [:destroy], Sluice::Storage::S3::RETRIES, " x #{source}", "Problem destroying #{filepath}. Retrying.") end # Second critical section: we need to update # processed_files in a thread-safe way mutex.synchronize do processed_files << filepath end end end end # Wait for threads to finish threads.each { |aThread| aThread.join } processed_files # Return the processed files end
A helper function to rename a file
# File lib/sluice/storage/s3/s3.rb, line 577 def self.rename_file(filepath, basename, rename_lambda=nil) if rename_lambda.nil? basename else case rename_lambda.arity when 2 rename_lambda.call(basename, filepath) when 1 rename_lambda.call(basename) when 0 rename_lambda.call() else raise StorageOperationError "Expect arity of 0, 1 or 2 for rename_lambda, not #{rename_lambda.arity}" end end end
A helper function to attempt to run a function retries times
Parameters:
object
-
Object to send our function to
send_args
-
Function plus arguments
retries
-
Number of retries to attempt
attempt_msg
-
Message to puts on each attempt
failure_msg
-
Message to puts on each failure
# File lib/sluice/storage/s3/s3.rb, line 618 def self.retry_x(object, send_args, retries, attempt_msg, failure_msg) i = 0 begin Timeout::timeout(TIMEOUT_WAIT) do # In case our operation times out object.send(*send_args) puts attempt_msg end rescue raise unless i < retries puts failure_msg sleep(RETRY_WAIT) # Give us a bit of time before retrying i += 1 retry end end