class PostgresToRedshift

table_catalog | postgres_to_redshift table_schema | public table_name | acquisition_pages table_type | BASE TABLE self_referencing_column_name | reference_generation | user_defined_type_catalog | user_defined_type_schema | user_defined_type_name | is_insertable_into | YES is_typed | NO commit_action |

Constants

GIGABYTE
KILOBYTE
MEGABYTE
VERSION

Attributes

source_uri[RW]
target_uri[RW]
s3[R]
source_connection[R]
target_connection[R]

Public Class Methods

source_connection() click to toggle source
# File lib/postgres_to_redshift.rb, line 41
def self.source_connection
  unless instance_variable_defined?(:"@source_connection")
    @source_connection = PG::Connection.new(host: source_uri.host, port: source_uri.port, user: source_uri.user || ENV['USER'], password: source_uri.password, dbname: source_uri.path[1..-1])
    @source_connection.exec("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;")
  end

  @source_connection
end
target_connection() click to toggle source
# File lib/postgres_to_redshift.rb, line 50
def self.target_connection
  unless instance_variable_defined?(:"@target_connection")
    @target_connection = PG::Connection.new(host: target_uri.host, port: target_uri.port, user: target_uri.user || ENV['USER'], password: target_uri.password, dbname: target_uri.path[1..-1])
  end

  @target_connection
end
update_tables() click to toggle source
# File lib/postgres_to_redshift.rb, line 21
def self.update_tables
  update_tables = PostgresToRedshift.new

  update_tables.tables.each do |table|
    target_connection.exec("CREATE TABLE IF NOT EXISTS public.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})")

    update_tables.copy_table(table)

    update_tables.import_table(table)
  end
end

Public Instance Methods

bucket() click to toggle source
# File lib/postgres_to_redshift.rb, line 83
def bucket
  @bucket ||= s3.buckets[ENV['S3_DATABASE_EXPORT_BUCKET']]
end
column_definitions(table) click to toggle source
# File lib/postgres_to_redshift.rb, line 75
def column_definitions(table)
  source_connection.exec("SELECT * FROM information_schema.columns WHERE table_schema='public' AND table_name='#{table.name}' order by ordinal_position")
end
copy_table(table) click to toggle source
# File lib/postgres_to_redshift.rb, line 87
def copy_table(table)
  tmpfile = Tempfile.new("psql2rs")
  zip = Zlib::GzipWriter.new(tmpfile)
  chunksize = 5 * GIGABYTE # uncompressed
  chunk = 1
  bucket.objects.with_prefix("export/#{table.target_table_name}.psv.gz").delete_all
  begin
    puts "Downloading #{table}"
    copy_command = "COPY (SELECT #{table.columns_for_copy} FROM #{table.name}) TO STDOUT WITH DELIMITER '|'"

    source_connection.copy_data(copy_command) do
      while row = source_connection.get_copy_data
        zip.write(row)
        if (zip.pos > chunksize)
          zip.finish
          tmpfile.rewind
          upload_table(table, tmpfile, chunk)
          chunk += 1
          zip.close unless zip.closed?
          tmpfile.unlink
          tmpfile = Tempfile.new("psql2rs")
          zip = Zlib::GzipWriter.new(tmpfile)
        end
      end
    end
    zip.finish
    tmpfile.rewind
    upload_table(table, tmpfile, chunk)
  ensure
    zip.close unless zip.closed?
    tmpfile.unlink
  end
end
import_table(table) click to toggle source
# File lib/postgres_to_redshift.rb, line 126
def import_table(table)
  puts "Importing #{table.target_table_name}"
  target_connection.exec("DROP TABLE IF EXISTS public.#{table.target_table_name}_updating")

  target_connection.exec("BEGIN;")

  target_connection.exec("ALTER TABLE public.#{target_connection.quote_ident(table.target_table_name)} RENAME TO #{table.target_table_name}_updating")

  target_connection.exec("CREATE TABLE public.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})")

  target_connection.exec("COPY public.#{target_connection.quote_ident(table.target_table_name)} FROM 's3://#{ENV['S3_DATABASE_EXPORT_BUCKET']}/export/#{table.target_table_name}.psv.gz' CREDENTIALS 'aws_access_key_id=#{ENV['S3_DATABASE_EXPORT_ID']};aws_secret_access_key=#{ENV['S3_DATABASE_EXPORT_KEY']}' GZIP TRUNCATECOLUMNS ESCAPE DELIMITER as '|';")

  target_connection.exec("COMMIT;")
end
tables() click to toggle source
# File lib/postgres_to_redshift.rb, line 66
def tables
  source_connection.exec("SELECT * FROM information_schema.tables WHERE table_schema = 'public' AND table_type in ('BASE TABLE', 'VIEW')").map do |table_attributes|
    table = Table.new(attributes: table_attributes)
    next if table.name =~ /^pg_/
    table.columns = column_definitions(table)
    table
  end.compact
end
upload_table(table, buffer, chunk) click to toggle source
# File lib/postgres_to_redshift.rb, line 121
def upload_table(table, buffer, chunk)
  puts "Uploading #{table.target_table_name}.#{chunk}"
  bucket.objects["export/#{table.target_table_name}.psv.gz.#{chunk}"].write(buffer, acl: :authenticated_read)
end