class Magellan::Gcs::Proxy::Context
Attributes
message[R]
remote_download_files[R]
workspace[R]
Public Class Methods
new(message)
click to toggle source
# File lib/magellan/gcs/proxy/context.rb, line 16 def initialize(message) @message = message @remote_download_files = parse_json(message.attributes['download_files']) @workspace = nil end
Public Instance Methods
build_local_files_obj(obj, mapping)
click to toggle source
# File lib/magellan/gcs/proxy/context.rb, line 120 def build_local_files_obj(obj, mapping) case obj when Hash then obj.each_with_object({}) { |(k, v), d| d[k] = build_local_files_obj(v, mapping) } when Array then obj.map { |i| build_local_files_obj(i, mapping) } when String then mapping[obj] else obj end end
build_mapping(base_dir, obj)
click to toggle source
# File lib/magellan/gcs/proxy/context.rb, line 93 def build_mapping(base_dir, obj) flatten_values(obj).flatten.each_with_object({}) do |url, d| uri = parse_uri(url) d[url] = File.join(base_dir, uri.host, uri.path) end end
directory?(path)
click to toggle source
# File lib/magellan/gcs/proxy/context.rb, line 85 def directory?(path) File.directory?(path) end
download()
click to toggle source
# File lib/magellan/gcs/proxy/context.rb, line 53 def download download_mapping.each do |url, path| FileUtils.mkdir_p File.dirname(path) logger.debug("Downloading: #{url} to #{path}") next if Proxy.config[:dryrun] uri = parse_uri(url) @last_bucket_name = uri.host bucket = GCP.storage.bucket(@last_bucket_name) file = bucket.file uri.path.sub(%r{\A/}, '') file.download(path) logger.info("Download OK: #{url} to #{path}") end end
download_mapping()
click to toggle source
# File lib/magellan/gcs/proxy/context.rb, line 40 def download_mapping @download_mapping ||= build_mapping(downloads_dir, remote_download_files) end
downloads_dir()
click to toggle source
# File lib/magellan/gcs/proxy/context.rb, line 36 def downloads_dir File.join(workspace, 'downloads') end
flatten_values(obj)
click to toggle source
# File lib/magellan/gcs/proxy/context.rb, line 100 def flatten_values(obj) case obj when nil then [] when Hash then flatten_values(obj.values) when Array then obj.map { |i| flatten_values(i) } else obj end end
local_download_files()
click to toggle source
# File lib/magellan/gcs/proxy/context.rb, line 44 def local_download_files @local_download_files ||= build_local_files_obj(remote_download_files, download_mapping) end
Also aliased as: download_files
ltsv(hash)
click to toggle source
# File lib/magellan/gcs/proxy/context.rb, line 32 def ltsv(hash) hash.map { |k, v| "#{k}:#{v}" }.join("\t") end
parse_json(str)
click to toggle source
# File lib/magellan/gcs/proxy/context.rb, line 109 def parse_json(str) return nil if str.nil? || str.empty? JSON.parse(str) end
parse_uri(str)
click to toggle source
# File lib/magellan/gcs/proxy/context.rb, line 114 def parse_uri(str) uri = URI.parse(str) raise "Unsupported scheme #{uri.scheme.inspect} of #{str}" unless uri.scheme == 'gs' uri end
setup() { || ... }
click to toggle source
# File lib/magellan/gcs/proxy/context.rb, line 22 def setup Dir.mktmpdir 'workspace' do |dir| @workspace = dir setup_dirs PubsubSustainer.run(message) do yield end end end
setup_dirs()
click to toggle source
# File lib/magellan/gcs/proxy/context.rb, line 89 def setup_dirs [:downloads_dir, :uploads_dir].each { |k| FileUtils.mkdir_p(send(k)) } end
upload()
click to toggle source
# File lib/magellan/gcs/proxy/context.rb, line 67 def upload Dir.chdir(uploads_dir) do Dir.glob('*') do |bucket_name| Dir.chdir(bucket_name) do Dir.glob('**/*') do |path| next if directory?(path) url = "gs://#{bucket_name}/#{path}" logger.info("Uploading: #{path} to #{url}") next if Proxy.config[:dryrun] bucket = GCP.storage.bucket(bucket_name) bucket.create_file path, path logger.info("Upload OK: #{path} to #{url}") end end end end end
uploads_dir()
click to toggle source
# File lib/magellan/gcs/proxy/context.rb, line 49 def uploads_dir File.join(workspace, 'uploads') end