class RailsRedshiftReplicator::Importers::Base

Attributes

replication[RW]

Public Class Methods

new(replication) click to toggle source
# File lib/rails_redshift_replicator/importers/base.rb, line 6
def initialize(replication)
  return if replication.blank?
  @replication = replication
end

Public Instance Methods

copy(table_name = replication.target_table, options = {}) click to toggle source

Runs Redshift COPY command to import data from S3 (docs.aws.amazon.com/redshift/latest/dg/r_COPY.html) @param [String] table name @param options [Hash] @option options [Boolean] :mark_as_imported If record should be flagged as imported @option options [Boolean] :noload If true, data will be validated but not imported

# File lib/rails_redshift_replicator/importers/base.rb, line 32
def copy(table_name = replication.target_table, options = {})
  begin
    RailsRedshiftReplicator.logger.info I18n.t(:importing_file, file: import_file, target_table: table_name, scope: :rails_redshift_replicator)
    result = ::RailsRedshiftReplicator.connection.exec copy_statement(table_name, options)
    replication.imported! if result.result_status == 1 && options[:mark_as_imported]
  rescue => e
    drop_table(table_name) if options[:can_drop_target_on_error]
    if e.message.index("stl_load_errors")
      get_redshift_error
      notify_error
    else
      replication.update_attribute :last_error, e.exception.inspect
    end
  end
end
copy_options() click to toggle source
# File lib/rails_redshift_replicator/importers/base.rb, line 63
def copy_options
  RailsRedshiftReplicator.copy_options.values.join(" ")
end
copy_statement(table_name, options = {}) click to toggle source

Builds the copy statement @param (see copy) @return [String] sql statement to run

# File lib/rails_redshift_replicator/importers/base.rb, line 51
      def copy_statement(table_name, options = {})
        format_options = replication.csv? ? "CSV" : "GZIP DELIMITER ',' ESCAPE REMOVEQUOTES"
        sql = <<-CS
          COPY #{table_name} from '#{import_file}' #{"NOLOAD" if options[:noload]}
          REGION '#{RailsRedshiftReplicator.s3_bucket_params[:region]}'
          CREDENTIALS 'aws_access_key_id=#{RailsRedshiftReplicator.aws_credentials[:key]};aws_secret_access_key=#{RailsRedshiftReplicator.aws_credentials[:secret]}'
          #{format_options}
          #{copy_options}
        CS
        sql.squish
      end
create_side_table() click to toggle source

Creates a permanent table for later renaming

# File lib/rails_redshift_replicator/importers/base.rb, line 96
def create_side_table
  RailsRedshiftReplicator.connection.exec "CREATE TABLE #{temporary_table_name} (LIKE #{replication.target_table})"
end
create_temp_table() click to toggle source

Creates a temporary table on redshift

# File lib/rails_redshift_replicator/importers/base.rb, line 91
def create_temp_table
  RailsRedshiftReplicator.connection.exec "CREATE TEMP TABLE #{temporary_table_name} (LIKE #{replication.target_table})"
end
drop_table(table_name = temporary_table_name) click to toggle source

Deletes the temporary table @param table_name [String]

# File lib/rails_redshift_replicator/importers/base.rb, line 147
def drop_table(table_name = temporary_table_name)
  ::RailsRedshiftReplicator.connection.exec "drop table if exists #{table_name}"
end
evaluate_history_cap() click to toggle source

History Cap has a minimum of 2

# File lib/rails_redshift_replicator/importers/base.rb, line 16
def evaluate_history_cap
  if cap = RailsRedshiftReplicator.history_cap
    RailsRedshiftReplicator::Replication.older_than(replication.source_table, cap).delete_all
  end
end
file_manager() click to toggle source
# File lib/rails_redshift_replicator/importers/base.rb, line 22
def file_manager
  @file_manager ||= RailsRedshiftReplicator::FileManager.new(self)
end
get_redshift_error() click to toggle source

Retrieves the last copy error for a given file on redshift

# File lib/rails_redshift_replicator/importers/base.rb, line 73
      def get_redshift_error
        sql = <<-RE.squish
          SELECT filename, line_number, colname, type, raw_field_value, raw_line, err_reason
          FROM STL_LOAD_ERRORS
          WHERE filename like '%#{import_file}%'
          ORDER BY starttime desc
          LIMIT 1
        RE
        result = ::RailsRedshiftReplicator.connection.exec(sql).entries
        error = result.first.map{ |k, v| [k, v.strip].join('=>') }.join(";")
        replication.update_attribute :last_error, error
      end
import() click to toggle source
# File lib/rails_redshift_replicator/importers/base.rb, line 11
def import
  raise NotImplementedError
end
import_file() click to toggle source

@return [String] location of import files on s3

# File lib/rails_redshift_replicator/importers/base.rb, line 68
def import_file
  "s3://#{RailsRedshiftReplicator.s3_bucket_params[:bucket]}/#{replication.key}"
end
merge_or_replace(mode:) click to toggle source

Runs a merge or replace operation on a redshift table The table is replaced on a FullReplicator strategy The table is merged on a TimedReplicator strategy @param :mode [Symbol] the operation type

# File lib/rails_redshift_replicator/importers/base.rb, line 104
def merge_or_replace(mode:)
  target  = replication.target_table
  stage = temporary_table_name
  sql = send("#{mode}_statement", target, stage)
  ::RailsRedshiftReplicator.connection.exec sql
end
merge_statement(target, stage) click to toggle source

Builds the merge sql statement. At first, it deletes the matching records from the target and temporary tables on the target table. After it imports everything from the temporary table into the target table. @param target [String] @param stage [String] temporary table @return [String] Sql Statement (docs.aws.amazon.com/redshift/latest/dg/merge-replacing-existing-rows.html)

# File lib/rails_redshift_replicator/importers/base.rb, line 118
      def merge_statement(target, stage)
        <<-SQLMERGE
          begin transaction;

          delete from #{target}
          using #{stage}
          where #{target}.id = #{stage}.id;
          insert into #{target}
          select * from #{stage};

          end transaction;
        SQLMERGE
      end
notify_error() click to toggle source

TODO

# File lib/rails_redshift_replicator/importers/base.rb, line 87
def notify_error
end
replace_statement(target, stage) click to toggle source

Builds the replace sql statement. @param (see merge_statement) @return (see merge_statement) (docs.aws.amazon.com/redshift/latest/dg/performing-a-deep-copy.html)

# File lib/rails_redshift_replicator/importers/base.rb, line 136
      def replace_statement(target, stage)
        <<-SQLREPLACE
          begin transaction;
          drop table #{target};
          alter table #{stage} rename to #{target};
          end transaction;
        SQLREPLACE
      end
temporary_table_name() click to toggle source

Returns a random name for a temporary table @return [String] table name

# File lib/rails_redshift_replicator/importers/base.rb, line 153
def temporary_table_name
  @temp_table ||= "temp_#{replication.target_table}_#{Time.now.to_i}"
end