class Outatime::Fetcher
Attributes
Public Class Methods
Public: Fetcher
will fetch the correct version of a file from S3.
options - The Hash options used to configure how fetcher works:
:bucket - The versioned bucket name (required). :destination - Destination for restored files (required). :from - Time description (required). :prefix - Restore files from this prefix. :s3_client - An existing Aws::S3::Client. :threads - Number of download threads :verbose - Verbose Mode
# File lib/outatime/fetcher.rb, line 60 def initialize(options = {}) @options = options @fetch_block_mutex = Mutex.new @s3_client = options[:s3_client] if options[:s3_client] @from = ::Chronic.parse(@options[:from]) if @options[:from] @pool = Thread.pool(@options.fetch(:threads, 20)) # raise if the date/time was not parsed raise ArgumentError, "The from time was not parseable." if @from.nil? end
Public Instance Methods
Public: Fetches the file versions from S3 bucket.
block - an optional block that receives the file description after it is downloaded to the local.
Returns nothing.
# File lib/outatime/fetcher.rb, line 77 def fetch!(&block) object_versions do |object_version| fetch_object(object_version, &block) end @pool.wait(:done) end
Public: Fetch the S3 object versions.
Returns an Array of Aws::S3::Types::ObjectVersion.
# File lib/outatime/fetcher.rb, line 95 def object_versions remaining_versions = [] remaining_delete_markers = [] s3_client.list_object_versions(bucket: @options[:bucket], prefix: @options[:prefix]).each do |response| versions = remaining_versions.concat(response.versions) versions_by_key = versions.group_by {|v| v.key } delete_markers = remaining_delete_markers.concat(response.delete_markers) delete_markers_by_key = delete_markers.group_by {|v| v.key } versions_by_key.each do |key, versions| next if response.next_key_marker == key filter_items(versions).each do |version| dl_marker = filter_items(Array(delete_markers_by_key[version.key])).first if dl_marker.nil? || (version.last_modified > dl_marker.last_modified) yield version end end end remaining_versions = Array(versions_by_key[response.next_key_marker]) remaining_delete_markers = Array(delete_markers_by_key[response.next_key_marker]) end end
Public: Returns the objects total size.
Returns an integer.
# File lib/outatime/fetcher.rb, line 88 def total_size object_versions.inject(0) { |sum, obj| sum += obj.size } end
Private Instance Methods
Private: Fetches the objects from S3 bucket.
files - an Array of Aws::S3::Types::ObjectVersion.
Returns nothing.
# File lib/outatime/fetcher.rb, line 136 def fetch_object(file) @pool.process do dest = Pathname.new("#{@options[:destination]}/#{file.key}") if file.key.end_with?("/") puts "Creating s3 subdirectory #{file.key} - #{Time.now}" if verbose? dest.mkpath else dest.dirname.mkpath puts "Copying from s3 #{file.key} - #{Time.now}" if verbose? s3_client.get_object(response_target: "#{dest}", bucket: @options[:bucket], key: file.key, version_id: file.version_id) end @fetch_block_mutex.synchronize { yield file } if block_given? end end
Private: Returns an Array of items modified on or before the @from date/time.
items - An Array of objects. Object must respond to last_modified.
Returns Array.
# File lib/outatime/fetcher.rb, line 169 def filter_items(items) items.keep_if { |obj| obj.last_modified <= @from }.uniq {|obj| obj.key } end
Private: Creates the S3 client instance.
Returns an Aws::S3::Client.
# File lib/outatime/fetcher.rb, line 159 def s3_client region = @options[:region] || ENV["AWS_REGION"] @s3_client ||= Aws::S3::Client.new(region: region) end
Private: Checks if it is in verbose mode.
Returns a boolean.
# File lib/outatime/fetcher.rb, line 127 def verbose? @options[:verbose] end