class RailsRedshiftReplicator::Importers::Base
Attributes
Public Class Methods
# File lib/rails_redshift_replicator/importers/base.rb, line 6 def initialize(replication) return if replication.blank? @replication = replication end
Public Instance Methods
Runs Redshift COPY command to import data from S3 (docs.aws.amazon.com/redshift/latest/dg/r_COPY.html) @param [String] table name @param options [Hash] @option options [Boolean] :mark_as_imported If record should be flagged as imported @option options [Boolean] :noload If true, data will be validated but not imported
# File lib/rails_redshift_replicator/importers/base.rb, line 32 def copy(table_name = replication.target_table, options = {}) begin RailsRedshiftReplicator.logger.info I18n.t(:importing_file, file: import_file, target_table: table_name, scope: :rails_redshift_replicator) result = ::RailsRedshiftReplicator.connection.exec copy_statement(table_name, options) replication.imported! if result.result_status == 1 && options[:mark_as_imported] rescue => e drop_table(table_name) if options[:can_drop_target_on_error] if e.message.index("stl_load_errors") get_redshift_error notify_error else replication.update_attribute :last_error, e.exception.inspect end end end
# File lib/rails_redshift_replicator/importers/base.rb, line 63 def copy_options RailsRedshiftReplicator.copy_options.values.join(" ") end
Builds the copy statement @param (see copy
) @return [String] sql statement to run
# File lib/rails_redshift_replicator/importers/base.rb, line 51 def copy_statement(table_name, options = {}) format_options = replication.csv? ? "CSV" : "GZIP DELIMITER ',' ESCAPE REMOVEQUOTES" sql = <<-CS COPY #{table_name} from '#{import_file}' #{"NOLOAD" if options[:noload]} REGION '#{RailsRedshiftReplicator.s3_bucket_params[:region]}' CREDENTIALS 'aws_access_key_id=#{RailsRedshiftReplicator.aws_credentials[:key]};aws_secret_access_key=#{RailsRedshiftReplicator.aws_credentials[:secret]}' #{format_options} #{copy_options} CS sql.squish end
Creates a permanent table for later renaming
# File lib/rails_redshift_replicator/importers/base.rb, line 96 def create_side_table RailsRedshiftReplicator.connection.exec "CREATE TABLE #{temporary_table_name} (LIKE #{replication.target_table})" end
Creates a temporary table on redshift
# File lib/rails_redshift_replicator/importers/base.rb, line 91 def create_temp_table RailsRedshiftReplicator.connection.exec "CREATE TEMP TABLE #{temporary_table_name} (LIKE #{replication.target_table})" end
Deletes the temporary table @param table_name [String]
# File lib/rails_redshift_replicator/importers/base.rb, line 147 def drop_table(table_name = temporary_table_name) ::RailsRedshiftReplicator.connection.exec "drop table if exists #{table_name}" end
History Cap has a minimum of 2
# File lib/rails_redshift_replicator/importers/base.rb, line 16 def evaluate_history_cap if cap = RailsRedshiftReplicator.history_cap RailsRedshiftReplicator::Replication.older_than(replication.source_table, cap).delete_all end end
# File lib/rails_redshift_replicator/importers/base.rb, line 22 def file_manager @file_manager ||= RailsRedshiftReplicator::FileManager.new(self) end
Retrieves the last copy error for a given file on redshift
# File lib/rails_redshift_replicator/importers/base.rb, line 73 def get_redshift_error sql = <<-RE.squish SELECT filename, line_number, colname, type, raw_field_value, raw_line, err_reason FROM STL_LOAD_ERRORS WHERE filename like '%#{import_file}%' ORDER BY starttime desc LIMIT 1 RE result = ::RailsRedshiftReplicator.connection.exec(sql).entries error = result.first.map{ |k, v| [k, v.strip].join('=>') }.join(";") replication.update_attribute :last_error, error end
# File lib/rails_redshift_replicator/importers/base.rb, line 11 def import raise NotImplementedError end
@return [String] location of import files on s3
# File lib/rails_redshift_replicator/importers/base.rb, line 68 def import_file "s3://#{RailsRedshiftReplicator.s3_bucket_params[:bucket]}/#{replication.key}" end
Runs a merge or replace operation on a redshift table The table is replaced on a FullReplicator
strategy The table is merged on a TimedReplicator
strategy @param :mode [Symbol] the operation type
# File lib/rails_redshift_replicator/importers/base.rb, line 104 def merge_or_replace(mode:) target = replication.target_table stage = temporary_table_name sql = send("#{mode}_statement", target, stage) ::RailsRedshiftReplicator.connection.exec sql end
Builds the merge sql statement. At first, it deletes the matching records from the target and temporary tables on the target table. After it imports everything from the temporary table into the target table. @param target [String] @param stage [String] temporary table @return [String] Sql Statement (docs.aws.amazon.com/redshift/latest/dg/merge-replacing-existing-rows.html)
# File lib/rails_redshift_replicator/importers/base.rb, line 118 def merge_statement(target, stage) <<-SQLMERGE begin transaction; delete from #{target} using #{stage} where #{target}.id = #{stage}.id; insert into #{target} select * from #{stage}; end transaction; SQLMERGE end
TODO
# File lib/rails_redshift_replicator/importers/base.rb, line 87 def notify_error end
Builds the replace sql statement. @param (see merge_statement
) @return (see merge_statement
) (docs.aws.amazon.com/redshift/latest/dg/performing-a-deep-copy.html)
# File lib/rails_redshift_replicator/importers/base.rb, line 136 def replace_statement(target, stage) <<-SQLREPLACE begin transaction; drop table #{target}; alter table #{stage} rename to #{target}; end transaction; SQLREPLACE end
Returns a random name for a temporary table @return [String] table name
# File lib/rails_redshift_replicator/importers/base.rb, line 153 def temporary_table_name @temp_table ||= "temp_#{replication.target_table}_#{Time.now.to_i}" end