module BulkImporter

Constants

NAME

Module name (used for loggin).

UPDATE_MODE_APPEND

Update modes

UPDATE_MODE_REPLACE
UPDATE_MODE_UPDATE
VERSION

Public Class Methods

import_from_csv(target, file, columns, keys, delimiter: ',', null: '', header: true, update_mode: UPDATE_MODE_APPEND) click to toggle source

Import data from a CSV file to an existing table

Parameters

  • target Target table.

  • file Source CSV file.

  • columns Array of CSV columns.

  • keys Primary keys of destination table.

Options

  • delimiter

  • null

  • header

  • update_mode Update mode for imported data.

Updated modes

  • self::UPDATE_MODE_APPEND Move only new data (default).

  • self::UPDATE_MODE_UPDATE Insert new data and updated prexistent.

  • self::UPDATE_MODE_REPLACE Truncate old data and insert the new one.

Return

integer Number of imported rows.

# File lib/bulk_importer.rb, line 40
def self.import_from_csv(target, file, columns, keys, delimiter: ',', null: '', header: true, update_mode: UPDATE_MODE_APPEND)
  return -1 unless file.is_a? File

  conn = ActiveRecord::Base.connection
  temp_name = target + '_' + Time.now.to_i.to_s + '_temporal'

  begin
    # Create temporary table (with all CSV fields)
    Rails.logger.debug \
      "[#{NAME}] Creating temporary table #{temp_name}(#{columns.keys})"
    conn.execute self.make_create_temp_table_sql(temp_name, columns.keys)

    # Import data
    Rails.logger.debug \
      "[#{NAME}] Importing data from #{file} to #{temp_name}"
    PostgresqlModule.copy_from(
      file,
      temp_name,
      format:    'csv',
      delimiter: delimiter,
      null:      null,
      header:    header
    )

    # Move data from temporary table to target and return total imported rows
    Rails.logger.debug \
      "[#{NAME}] Moving new data to #{target} with mode #{update_mode}"
    self.move_imported_data(temp_name, target, columns, keys, update_mode)
  rescue Exception => e
    Rails.logger.error e.message
    Rails.logger.error e.backtrace
    return -1
  ensure
    # Drop temporary table (if exists)
    ActiveRecord::Base.connection.execute "DROP TABLE IF EXISTS #{temp_name}"
  end
end
is_update_mode_valid(update_mode) click to toggle source

Check if the update mode is valid.

Parameters

  • update_mode

Return

bool

# File lib/bulk_importer.rb, line 318
def self.is_update_mode_valid(update_mode)
  valid_update_modes = []

  self.constants.each do |constant|
    if constant.to_s.start_with? 'UPDATE_MODE_'
      valid_update_modes << self.const_get(constant)
    end
  end

  valid_update_modes.include? update_mode.to_s
end
keys_to_list(keys, prefix = nil, types = nil) click to toggle source

Translate an array of keys in a list with an optional prefix.

Parameters

  • keys

  • prefix

  • types

Return

  • string

TODO: Add doc

# File lib/bulk_importer.rb, line 294
def self.keys_to_list(keys, prefix = nil, types = nil)
  list = []

  keys.each do |i|
    col = i
    col = [ prefix, i ].compact.join('.') unless prefix.nil?
    col = [ col, types[i] ].compact.join('::') unless types.nil?

    list << col
  end

  list.join ','
end
make_create_temp_table_sql(name, columns) click to toggle source

Makes the SQL command to create a temporary table.

Parameters

  • name Name of temporary table.

  • columns Array of columns (just name).

Return

string

# File lib/bulk_importer.rb, line 275
def self.make_create_temp_table_sql(name, columns)
  columns = columns.map { |i| i + ' text' }
  "CREATE TEMPORARY TABLE #{name} (#{columns.join(',')})"
end
make_move_imported_data_sql(origin, destination, columns, keys, update_mode) click to toggle source

Makes the SQL command to move the imported data.

Parameters

  • origin Origin (temporary table).

  • destination Destination table.

  • columns Array of CSV columns.

  • keys Primary keys of destination table.

  • update_mode Update mode.

Return

array Array of queries to execute.

# File lib/bulk_importer.rb, line 133
def self.make_move_imported_data_sql(origin, destination, columns, keys, update_mode)
  case update_mode
  when UPDATE_MODE_APPEND
    # Insert new
    self.make_update_mode_append_sql origin, destination, columns, keys
  when UPDATE_MODE_UPDATE
    # Insert new and Update prexistent
    self.make_update_mode_update_sql origin, destination, columns, keys
  when UPDATE_MODE_REPLACE
    # Truncate destination and Insert new (all)
    self.make_update_mode_replace_sql origin, destination, columns, keys
  end
end
make_update_mode_append_sql(origin, destination, columns, keys) click to toggle source

Makes the SQL command to append new imported data.

Parameters

  • origin Origin (temporary table).

  • destination Destination table.

  • columns Array of CSV columns.

  • keys Primary keys of destination table.

Return

array Array of queries to execute

# File lib/bulk_importer.rb, line 160
def self.make_update_mode_append_sql(origin, destination, columns, keys)
  sql = []

  columns = columns.delete_if { |item| columns[item].nil? }
  types = {}

  pg_types = PostgresqlModule.get_column_types destination
  columns.values.each { |i| types[columns.invert[i]] = pg_types[i] }

  sql << "INSERT INTO #{destination}"
  sql << "(#{columns.values.join(',')})"
  sql << "SELECT #{self.keys_to_list(columns.keys, 'o', types)}"
  sql << "FROM #{origin} o"
  sql << "LEFT JOIN #{destination} d"
  sql << "ON (#{self.keys_to_list(keys.keys, 'o', types)}) = "
  sql << "(#{self.keys_to_list(keys.values, 'd')})"
  sql << "WHERE (#{self.keys_to_list(keys.values, 'd')}) is null"

  [ sql.join(' ') ]
end
make_update_mode_replace_sql(origin, destination, columns, key) click to toggle source

Makes the SQL command to remove existing data and Insert new (all).

Parameters

  • origin Origin (temporary table).

  • destination Destination table.

  • columns Array of CSV columns.

  • keys Primary keys of destination table.

Return

array Array of queries to execute

# File lib/bulk_importer.rb, line 255
def self.make_update_mode_replace_sql(origin, destination, columns, key)
  q = []

  q << "TRUNCATE TABLE #{destination}"
  q.concat self.make_update_mode_append_sql origin, destination, columns, keys

  return q
end
make_update_mode_update_sql(origin, destination, columns, keys) click to toggle source

Makes the SQL command to update prexistent data.

Parameters

  • origin Origin (temporary table).

  • destination Destination table.

  • columns Array of CSV columns.

  • keys Primary keys of destination table.

Return

array Array of queries to execute

# File lib/bulk_importer.rb, line 194
def self.make_update_mode_update_sql(origin, destination, columns, keys)
  q = self.make_update_mode_append_sql(origin, destination, columns, keys)

  sql = []
  types = {}

  pg_types = PostgresqlModule.get_column_types destination
  columns.values.each { |i| types[columns.invert[i]] = pg_types[i] }

  o_columns_without_keys = columns.keys.delete_if { |i| keys.has_key? i }
  d_columns_without_keys = columns.values.delete_if { |i| keys.has_value? i }

  set = []
  columns.delete_if { |item| columns[item].nil? }.keys.each do |column|
    set << "#{columns[column]} = o.#{column}::#{types[column]}"
  end
  sets = set.join(',')

  if columns.has_value? 'updated_at'
    # Field is updated if origin's updated_at is greater than destination.

    PostgresqlModule.create_index_on origin, columns.invert['updated_at'].downcase

    sql << "UPDATE #{destination} d SET #{sets}"
    sql << "FROM #{origin} o"
    sql << "WHERE (#{self.keys_to_list(keys.keys, 'o', types)}) = "
    sql << "(#{self.keys_to_list(keys.values, 'd')}) AND"
    sql << "o.#{columns.invert['updated_at'].downcase}::timestamp > d.updated_at"
  else
    # Check if any field changed
    sql << "WITH #{origin}_prexistent_modified AS ("
    sql << "SELECT o.* FROM #{origin} o JOIN #{destination} d"
    sql << "ON (#{self.keys_to_list(keys.keys, 'o', types)}) = "
    sql << "(#{self.keys_to_list(keys.values, 'd')}) AND "
    sql << "(#{self.keys_to_list(o_columns_without_keys, 'o', types)}) != "
    sql << "(#{self.keys_to_list(d_columns_without_keys, 'd')})"
    sql << ")"
    sql << "UPDATE #{destination} d SET #{sets}"
    sql << "FROM #{origin}_prexistent_modified o"
    sql << "WHERE (#{self.keys_to_list(keys.keys, 'o', types)}) = "
    sql << "(#{self.keys_to_list(keys.values, 'd')})"
  end

  q << sql.join(' ')

  return q
end
move_imported_data(origin, destination, columns, keys, update_mode) click to toggle source

Move imported data from origin (raw imported) to destination.

Parameters

  • origin Origin (temporary table).

  • destination Destination table.

  • columns Array of CSV columns.

  • keys Primary keys of destination table.

  • update_mode Update mode.

Return

integer Number of imported rows.

# File lib/bulk_importer.rb, line 92
def self.move_imported_data(origin, destination, columns, keys, update_mode)
  unless self.is_update_mode_valid update_mode
    raise "[#{NAME}] Unknown update mode: #{update_mode}"
  end

  # Create an index to improve move performance.
  Rails.logger.debug "[#{NAME}] Creating index on #{origin}"
  PostgresqlModule.create_index_on origin, keys.keys.map { |i| i.downcase }

  queries = self.make_move_imported_data_sql(
    origin,
    destination,
    columns,
    keys,
    update_mode
  )

  rows = 0

  queries.each do |query|
    Rails.logger.debug "[#{NAME}] Running query <<#{query}>>"
    rows += ActiveRecord::Base.connection.execute(query).cmd_tuples
  end

  return rows
end