class Lexicon::Common::Production::DatasourceLoader
Attributes
@return [Database::Factory]
@return [String]
@return [FileLoader]
@return [Psql]
@return [ShellExecutor]
@return [TableLocker]
Public Class Methods
@param [ShellExecutor] shell @param [Database::Factory] database_factory
@param [FileLoader] file_loader
@param [String] database_url
@param [TableLocker] table_locker
@param [Psql] psql
# File lib/lexicon/common/production/datasource_loader.rb, line 18 def initialize(shell:, database_factory:, file_loader:, database_url:, table_locker:, psql:) @shell = shell @database_factory = database_factory @file_loader = file_loader @database_url = database_url @table_locker = table_locker @psql = psql end
Public Instance Methods
@param [Package::Package] package @param [Array<String>, nil] only
If nil, all datasets are loaded. If present, only listed datasets are loaded. Structures are ALWAYS loaded
@param [Array<String>] without
# File lib/lexicon/common/production/datasource_loader.rb, line 33 def load_package(package, only: nil, without: []) case package.schema_version when 1 load_v1(package, only: only, without: without) when 2 load_v2(package, only: only, without: without) else log("Schema version #{package.schema_version} is not supported") end end
Private Instance Methods
@param [Array<Package::Mixin::Nameable>] file_sets @param [Array<String>, nil] only @param [Array<String>] without @return [Corindon::Result::Result]
# File lib/lexicon/common/production/datasource_loader.rb, line 116 def filter_file_sets(file_sets, only:, without:) sets = if only.nil? file_sets else sets_by_name = file_sets.map { |fs| [fs.name, fs] }.to_h missing, present = only.map { |name| [name, sets_by_name.fetch(name, nil)] } .partition { |(_name, value)| value.nil? } if missing.any? puts "[ NOK ] Datasources #{missing.map(&:first).join(', ')} don't exist!" return Failure(StandardError.new("Datasources #{missing.map(&:first).join(', ')} don't exist!")) end present.map(&:second) .select(&:data_path) end Success(sets.reject { |fs| without.include?(fs.name) }) end
@param [Pathname] file @param [String] into @param [String] schema
# File lib/lexicon/common/production/datasource_loader.rb, line 154 def load_csv(file, into:, schema:) psql.execute_raw(<<~SQL) \\copy "#{schema}"."#{into}" FROM PROGRAM 'zcat #{file}' WITH csv SQL end
# File lib/lexicon/common/production/datasource_loader.rb, line 160 def load_structure_files(files, schema:, dir:) database = database_factory.new_instance(url: database_url) database.prepend_search_path(schema) do files.each do |file| database.query(dir.join(file).read) end end end
@param [Package::V1::Package] package
# File lib/lexicon/common/production/datasource_loader.rb, line 47 def load_v1(package, only: nil, without: []) file_sets = filter_file_sets(package.file_sets, only: only, without: without) .unwrap! .select(&:data_path) load_structure_files( package.files.select(&:structure?).map(&:path), schema: version_to_schema(package.version), dir: package.dir ) remaining = ::Concurrent::Set.new(file_sets.map(&:name)) file_sets.map do |fs| Thread.new do file_loader.load_file(package.data_path(fs)) remaining.delete(fs.name) puts '[ OK ] '.green + fs.name.yellow + ", #{remaining_message(remaining)}" end end.each(&:join) table_locker.lock_tables(package: package, tables: package.file_sets.flat_map(&:tables)) end
@param [Package::V2::Package] package @param [Array<String>, nil] only @param [Array<String>] without
# File lib/lexicon/common/production/datasource_loader.rb, line 85 def load_v2(package, only: nil, without: []) file_sets = filter_file_sets(package.file_sets, only: only, without: without) .unwrap! .select { |fs| fs.tables.any? } schema = version_to_schema(package.version) load_structure_files(package.files.select(&:structure?).map(&:path), schema: schema, dir: package.dir) remaining = ::Concurrent::Set.new(file_sets.flat_map{|fs| fs.tables.values.flatten(1) }) threads = file_sets.flat_map do |fs| fs.tables.flat_map do |name, files| files.map do |file| Thread.new do load_csv(package.data_dir.join(file), into: name, schema: schema) remaining.delete(file) puts '[ OK ] '.green + file.to_s.yellow + ", #{remaining_message(remaining)}" end end end end threads.each(&:join) end
# File lib/lexicon/common/production/datasource_loader.rb, line 72 def remaining_message(remaining) if remaining.size.zero? 'All done!' elsif remaining.size > 5 "#{remaining.size} remaining" else "Remaining: #{remaining.to_a.sort.join(', ')}" end end