class PostgresToRedshift
table_catalog | postgres_to_redshift table_schema | public table_name | acquisition_pages table_type | BASE TABLE self_referencing_column_name | reference_generation | user_defined_type_catalog | user_defined_type_schema | user_defined_type_name | is_insertable_into | YES is_typed | NO commit_action |
Constants
- GIGABYTE
- KILOBYTE
- MEGABYTE
- VERSION
Attributes
source_uri[RW]
target_uri[RW]
s3[R]
source_connection[R]
target_connection[R]
Public Class Methods
source_connection()
click to toggle source
# File lib/postgres_to_redshift.rb, line 41 def self.source_connection unless instance_variable_defined?(:"@source_connection") @source_connection = PG::Connection.new(host: source_uri.host, port: source_uri.port, user: source_uri.user || ENV['USER'], password: source_uri.password, dbname: source_uri.path[1..-1]) @source_connection.exec("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;") end @source_connection end
target_connection()
click to toggle source
# File lib/postgres_to_redshift.rb, line 50 def self.target_connection unless instance_variable_defined?(:"@target_connection") @target_connection = PG::Connection.new(host: target_uri.host, port: target_uri.port, user: target_uri.user || ENV['USER'], password: target_uri.password, dbname: target_uri.path[1..-1]) end @target_connection end
update_tables()
click to toggle source
# File lib/postgres_to_redshift.rb, line 21 def self.update_tables update_tables = PostgresToRedshift.new update_tables.tables.each do |table| target_connection.exec("CREATE TABLE IF NOT EXISTS public.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})") update_tables.copy_table(table) update_tables.import_table(table) end end
Public Instance Methods
bucket()
click to toggle source
# File lib/postgres_to_redshift.rb, line 83 def bucket @bucket ||= s3.buckets[ENV['S3_DATABASE_EXPORT_BUCKET']] end
column_definitions(table)
click to toggle source
# File lib/postgres_to_redshift.rb, line 75 def column_definitions(table) source_connection.exec("SELECT * FROM information_schema.columns WHERE table_schema='public' AND table_name='#{table.name}' order by ordinal_position") end
copy_table(table)
click to toggle source
# File lib/postgres_to_redshift.rb, line 87 def copy_table(table) tmpfile = Tempfile.new("psql2rs") zip = Zlib::GzipWriter.new(tmpfile) chunksize = 5 * GIGABYTE # uncompressed chunk = 1 bucket.objects.with_prefix("export/#{table.target_table_name}.psv.gz").delete_all begin puts "Downloading #{table}" copy_command = "COPY (SELECT #{table.columns_for_copy} FROM #{table.name}) TO STDOUT WITH DELIMITER '|'" source_connection.copy_data(copy_command) do while row = source_connection.get_copy_data zip.write(row) if (zip.pos > chunksize) zip.finish tmpfile.rewind upload_table(table, tmpfile, chunk) chunk += 1 zip.close unless zip.closed? tmpfile.unlink tmpfile = Tempfile.new("psql2rs") zip = Zlib::GzipWriter.new(tmpfile) end end end zip.finish tmpfile.rewind upload_table(table, tmpfile, chunk) ensure zip.close unless zip.closed? tmpfile.unlink end end
import_table(table)
click to toggle source
# File lib/postgres_to_redshift.rb, line 126 def import_table(table) puts "Importing #{table.target_table_name}" target_connection.exec("DROP TABLE IF EXISTS public.#{table.target_table_name}_updating") target_connection.exec("BEGIN;") target_connection.exec("ALTER TABLE public.#{target_connection.quote_ident(table.target_table_name)} RENAME TO #{table.target_table_name}_updating") target_connection.exec("CREATE TABLE public.#{target_connection.quote_ident(table.target_table_name)} (#{table.columns_for_create})") target_connection.exec("COPY public.#{target_connection.quote_ident(table.target_table_name)} FROM 's3://#{ENV['S3_DATABASE_EXPORT_BUCKET']}/export/#{table.target_table_name}.psv.gz' CREDENTIALS 'aws_access_key_id=#{ENV['S3_DATABASE_EXPORT_ID']};aws_secret_access_key=#{ENV['S3_DATABASE_EXPORT_KEY']}' GZIP TRUNCATECOLUMNS ESCAPE DELIMITER as '|';") target_connection.exec("COMMIT;") end
tables()
click to toggle source
# File lib/postgres_to_redshift.rb, line 66 def tables source_connection.exec("SELECT * FROM information_schema.tables WHERE table_schema = 'public' AND table_type in ('BASE TABLE', 'VIEW')").map do |table_attributes| table = Table.new(attributes: table_attributes) next if table.name =~ /^pg_/ table.columns = column_definitions(table) table end.compact end
upload_table(table, buffer, chunk)
click to toggle source
# File lib/postgres_to_redshift.rb, line 121 def upload_table(table, buffer, chunk) puts "Uploading #{table.target_table_name}.#{chunk}" bucket.objects["export/#{table.target_table_name}.psv.gz.#{chunk}"].write(buffer, acl: :authenticated_read) end