module BulkImporter
Constants
- NAME
Module name (used for loggin).
- UPDATE_MODE_APPEND
Update modes
- UPDATE_MODE_REPLACE
- UPDATE_MODE_UPDATE
- VERSION
Public Class Methods
Import data from a CSV file to an existing table
Parameters¶ ↑
-
target
Target table. -
file
Source CSV file. -
columns
Array of CSV columns. -
keys
Primary keys of destination table.
Options¶ ↑
-
delimiter
-
null
-
header
-
update_mode
Update mode for imported data.
Updated modes¶ ↑
-
self::UPDATE_MODE_APPEND
Move only new data (default). -
self::UPDATE_MODE_UPDATE
Insert new data and updated prexistent. -
self::UPDATE_MODE_REPLACE
Truncate old data and insert the new one.
Return¶ ↑
integer
Number of imported rows.
# File lib/bulk_importer.rb, line 40 def self.import_from_csv(target, file, columns, keys, delimiter: ',', null: '', header: true, update_mode: UPDATE_MODE_APPEND) return -1 unless file.is_a? File conn = ActiveRecord::Base.connection temp_name = target + '_' + Time.now.to_i.to_s + '_temporal' begin # Create temporary table (with all CSV fields) Rails.logger.debug \ "[#{NAME}] Creating temporary table #{temp_name}(#{columns.keys})" conn.execute self.make_create_temp_table_sql(temp_name, columns.keys) # Import data Rails.logger.debug \ "[#{NAME}] Importing data from #{file} to #{temp_name}" PostgresqlModule.copy_from( file, temp_name, format: 'csv', delimiter: delimiter, null: null, header: header ) # Move data from temporary table to target and return total imported rows Rails.logger.debug \ "[#{NAME}] Moving new data to #{target} with mode #{update_mode}" self.move_imported_data(temp_name, target, columns, keys, update_mode) rescue Exception => e Rails.logger.error e.message Rails.logger.error e.backtrace return -1 ensure # Drop temporary table (if exists) ActiveRecord::Base.connection.execute "DROP TABLE IF EXISTS #{temp_name}" end end
Check if the update mode is valid.
Parameters¶ ↑
-
update_mode
Return¶ ↑
bool
# File lib/bulk_importer.rb, line 318 def self.is_update_mode_valid(update_mode) valid_update_modes = [] self.constants.each do |constant| if constant.to_s.start_with? 'UPDATE_MODE_' valid_update_modes << self.const_get(constant) end end valid_update_modes.include? update_mode.to_s end
Translate an array of keys in a list with an optional prefix.
Parameters¶ ↑
-
keys
-
prefix
-
types
Return¶ ↑
-
string
TODO: Add doc
# File lib/bulk_importer.rb, line 294 def self.keys_to_list(keys, prefix = nil, types = nil) list = [] keys.each do |i| col = i col = [ prefix, i ].compact.join('.') unless prefix.nil? col = [ col, types[i] ].compact.join('::') unless types.nil? list << col end list.join ',' end
Makes the SQL command to create a temporary table.
Parameters¶ ↑
-
name
Name of temporary table. -
columns
Array of columns (just name).
Return¶ ↑
string
# File lib/bulk_importer.rb, line 275 def self.make_create_temp_table_sql(name, columns) columns = columns.map { |i| i + ' text' } "CREATE TEMPORARY TABLE #{name} (#{columns.join(',')})" end
Makes the SQL command to move the imported data.
Parameters¶ ↑
-
origin
Origin (temporary table). -
destination
Destination table. -
columns
Array of CSV columns. -
keys
Primary keys of destination table. -
update_mode
Update mode.
Return¶ ↑
array
Array of queries to execute.
# File lib/bulk_importer.rb, line 133 def self.make_move_imported_data_sql(origin, destination, columns, keys, update_mode) case update_mode when UPDATE_MODE_APPEND # Insert new self.make_update_mode_append_sql origin, destination, columns, keys when UPDATE_MODE_UPDATE # Insert new and Update prexistent self.make_update_mode_update_sql origin, destination, columns, keys when UPDATE_MODE_REPLACE # Truncate destination and Insert new (all) self.make_update_mode_replace_sql origin, destination, columns, keys end end
Makes the SQL command to append new imported data.
Parameters¶ ↑
-
origin
Origin (temporary table). -
destination
Destination table. -
columns
Array of CSV columns. -
keys
Primary keys of destination table.
Return¶ ↑
array
Array of queries to execute
# File lib/bulk_importer.rb, line 160 def self.make_update_mode_append_sql(origin, destination, columns, keys) sql = [] columns = columns.delete_if { |item| columns[item].nil? } types = {} pg_types = PostgresqlModule.get_column_types destination columns.values.each { |i| types[columns.invert[i]] = pg_types[i] } sql << "INSERT INTO #{destination}" sql << "(#{columns.values.join(',')})" sql << "SELECT #{self.keys_to_list(columns.keys, 'o', types)}" sql << "FROM #{origin} o" sql << "LEFT JOIN #{destination} d" sql << "ON (#{self.keys_to_list(keys.keys, 'o', types)}) = " sql << "(#{self.keys_to_list(keys.values, 'd')})" sql << "WHERE (#{self.keys_to_list(keys.values, 'd')}) is null" [ sql.join(' ') ] end
Makes the SQL command to remove existing data and Insert new (all).
Parameters¶ ↑
-
origin
Origin (temporary table). -
destination
Destination table. -
columns
Array of CSV columns. -
keys
Primary keys of destination table.
Return¶ ↑
array
Array of queries to execute
# File lib/bulk_importer.rb, line 255 def self.make_update_mode_replace_sql(origin, destination, columns, key) q = [] q << "TRUNCATE TABLE #{destination}" q.concat self.make_update_mode_append_sql origin, destination, columns, keys return q end
Makes the SQL command to update prexistent data.
Parameters¶ ↑
-
origin
Origin (temporary table). -
destination
Destination table. -
columns
Array of CSV columns. -
keys
Primary keys of destination table.
Return¶ ↑
array
Array of queries to execute
# File lib/bulk_importer.rb, line 194 def self.make_update_mode_update_sql(origin, destination, columns, keys) q = self.make_update_mode_append_sql(origin, destination, columns, keys) sql = [] types = {} pg_types = PostgresqlModule.get_column_types destination columns.values.each { |i| types[columns.invert[i]] = pg_types[i] } o_columns_without_keys = columns.keys.delete_if { |i| keys.has_key? i } d_columns_without_keys = columns.values.delete_if { |i| keys.has_value? i } set = [] columns.delete_if { |item| columns[item].nil? }.keys.each do |column| set << "#{columns[column]} = o.#{column}::#{types[column]}" end sets = set.join(',') if columns.has_value? 'updated_at' # Field is updated if origin's updated_at is greater than destination. PostgresqlModule.create_index_on origin, columns.invert['updated_at'].downcase sql << "UPDATE #{destination} d SET #{sets}" sql << "FROM #{origin} o" sql << "WHERE (#{self.keys_to_list(keys.keys, 'o', types)}) = " sql << "(#{self.keys_to_list(keys.values, 'd')}) AND" sql << "o.#{columns.invert['updated_at'].downcase}::timestamp > d.updated_at" else # Check if any field changed sql << "WITH #{origin}_prexistent_modified AS (" sql << "SELECT o.* FROM #{origin} o JOIN #{destination} d" sql << "ON (#{self.keys_to_list(keys.keys, 'o', types)}) = " sql << "(#{self.keys_to_list(keys.values, 'd')}) AND " sql << "(#{self.keys_to_list(o_columns_without_keys, 'o', types)}) != " sql << "(#{self.keys_to_list(d_columns_without_keys, 'd')})" sql << ")" sql << "UPDATE #{destination} d SET #{sets}" sql << "FROM #{origin}_prexistent_modified o" sql << "WHERE (#{self.keys_to_list(keys.keys, 'o', types)}) = " sql << "(#{self.keys_to_list(keys.values, 'd')})" end q << sql.join(' ') return q end
Move imported data from origin (raw imported) to destination.
Parameters¶ ↑
-
origin
Origin (temporary table). -
destination
Destination table. -
columns
Array of CSV columns. -
keys
Primary keys of destination table. -
update_mode
Update mode.
Return¶ ↑
integer
Number of imported rows.
# File lib/bulk_importer.rb, line 92 def self.move_imported_data(origin, destination, columns, keys, update_mode) unless self.is_update_mode_valid update_mode raise "[#{NAME}] Unknown update mode: #{update_mode}" end # Create an index to improve move performance. Rails.logger.debug "[#{NAME}] Creating index on #{origin}" PostgresqlModule.create_index_on origin, keys.keys.map { |i| i.downcase } queries = self.make_move_imported_data_sql( origin, destination, columns, keys, update_mode ) rows = 0 queries.each do |query| Rails.logger.debug "[#{NAME}] Running query <<#{query}>>" rows += ActiveRecord::Base.connection.execute(query).cmd_tuples end return rows end