class TableCopy::PG::Destination
Attributes
after_create[R]
conn_method[R]
fields[R]
indexes[R]
primary_key[R]
sequence_field[R]
soft_delete_field[R]
table_name[R]
Public Class Methods
new(args)
click to toggle source
# File lib/table_copy/pg/destination.rb, line 6 def initialize(args) @table_name = args[:table_name] @primary_key = args[:primary_key] @sequence_field = args[:sequence_field] @conn_method = args[:conn_method] @indexes = args[:indexes] || [] @fields = args[:fields] @after_create = args[:after_create] @soft_delete_field = args[:soft_delete_field] end
Public Instance Methods
copy_data_from(source_table, temp: nil, pk_only: false, update: false)
click to toggle source
# File lib/table_copy/pg/destination.rb, line 78 def copy_data_from(source_table, temp: nil, pk_only: false, update: false) temp = 'temp_' if temp fl = pk_only ? primary_key : fields_list where = "where #{sequence_field} > '#{update}'" if update && sequence_field count = 0 source_table.copy_from(fl, where) do |source_conn| with_conn do |conn| conn.copy_data("COPY #{temp}#{table_name} (#{fl}) FROM STDOUT CSV") do while row = source_conn.get_copy_data count += 1 conn.put_copy_data(row) end end end end count end
copy_from_temp(except: except_statement)
click to toggle source
# File lib/table_copy/pg/destination.rb, line 96 def copy_from_temp(except: except_statement) with_conn do |conn| conn.exec(upsert_sql(except)) end end
create(fields_ddl)
click to toggle source
# File lib/table_copy/pg/destination.rb, line 30 def create(fields_ddl) sd = ", #{soft_delete_field} bool default false" if soft_delete_field with_conn do |conn| conn.exec("create table #{table_name} (#{fields_ddl}#{sd})") end after_create.call(table_name) if after_create end
create_indexes()
click to toggle source
# File lib/table_copy/pg/destination.rb, line 45 def create_indexes indexes.each do |index| create_ddl = index.class.new(table_name, index.name, index.columns).create with_conn do |conn| conn.exec(create_ddl) end end end
create_temp(fields_ddl)
click to toggle source
# File lib/table_copy/pg/destination.rb, line 66 def create_temp(fields_ddl) with_conn do |conn| conn.exec("create temp table temp_#{table_name} (#{fields_ddl}) on commit drop") end end
create_views(views)
click to toggle source
# File lib/table_copy/pg/destination.rb, line 118 def create_views(views) with_conn do |conn| views.inject({}) do |result, view| begin conn.exec("create or replace view #{view['viewname']} as (#{view['definition'].gsub(/;\z/, '')})") result[view['viewname']] = true rescue ::PG::UndefinedTable, ::PG::UndefinedColumn => e result[view['viewname']] = false end result end end end
delete_not_in_temp()
click to toggle source
# File lib/table_copy/pg/destination.rb, line 102 def delete_not_in_temp with_conn do |conn| if soft_delete_field conn.exec("update #{table_name} set #{soft_delete_field}=true where #{not_in_temp} and (#{soft_delete_field} is null or #{soft_delete_field} != true)") else conn.exec("delete from #{table_name} where #{not_in_temp}") end end end
drop(opts={})
click to toggle source
# File lib/table_copy/pg/destination.rb, line 38 def drop(opts={}) cascade = ' cascade' if opts[:cascade] with_conn do |conn| conn.exec("#{drop_sql}#{cascade}") end end
max_sequence()
click to toggle source
# File lib/table_copy/pg/destination.rb, line 58 def max_sequence return unless sequence_field with_conn do |conn| row = conn.exec(max_sequence_sql).first row['max'] if row end end
none?()
click to toggle source
# File lib/table_copy/pg/destination.rb, line 72 def none? with_conn do |conn| conn.exec("select count(*) from #{table_name}").first['count'] == '0' end end
query_views()
click to toggle source
# File lib/table_copy/pg/destination.rb, line 112 def query_views with_conn do |conn| conn.exec(views_sql) end end
to_s()
click to toggle source
# File lib/table_copy/pg/destination.rb, line 54 def to_s table_name end
transaction() { || ... }
click to toggle source
# File lib/table_copy/pg/destination.rb, line 17 def transaction with_conn do |conn| begin conn.exec('begin') yield conn.exec('commit') rescue Exception => e conn.exec('rollback') raise e end end end
Private Instance Methods
drop_sql()
click to toggle source
# File lib/table_copy/pg/destination.rb, line 149 def drop_sql @drop_sql ||= "drop table if exists #{table_name}" end
except_statement()
click to toggle source
# File lib/table_copy/pg/destination.rb, line 178 def except_statement @except_statement ||= "except select #{fields_list} from #{table_name}" end
fields_list()
click to toggle source
# File lib/table_copy/pg/destination.rb, line 140 def fields_list @fields_list ||= fields.join(', ') end
max_sequence_sql()
click to toggle source
# File lib/table_copy/pg/destination.rb, line 153 def max_sequence_sql @max_sequence_sql ||= "select max(#{sequence_field}) from #{table_name}" end
not_in_temp()
click to toggle source
# File lib/table_copy/pg/destination.rb, line 134 def not_in_temp "#{primary_key} in (select #{primary_key} from #{table_name} except select #{primary_key} from temp_#{table_name})" end
return_statement(keys)
click to toggle source
# File lib/table_copy/pg/destination.rb, line 188 def return_statement(keys) keys.map.with_index(1) do |key, i| "nv.#{key}" end.join(',') end
set_statement(keys)
click to toggle source
# File lib/table_copy/pg/destination.rb, line 182 def set_statement(keys) keys.map.with_index(1) do |key, i| "#{key}=nv.#{key}" end.join(',') end
upsert_sql(except=except_statement)
click to toggle source
# File lib/table_copy/pg/destination.rb, line 157 def upsert_sql(except=except_statement) "with new_values as ( select #{fields_list} from temp_#{table_name} #{except} ) ,upsert as ( UPDATE #{table_name} SET #{set_statement(fields)} FROM new_values as nv WHERE #{table_name}.#{primary_key} = nv.#{primary_key} RETURNING #{return_statement(fields)} ) INSERT INTO #{table_name} (#{fields_list}) SELECT * FROM new_values as nv WHERE NOT EXISTS (SELECT 1 FROM #{table_name} WHERE #{table_name}.#{primary_key} = nv.#{primary_key});" end
views_sql()
click to toggle source
# File lib/table_copy/pg/destination.rb, line 194 def views_sql <<-SQL select viewname, definition from pg_views where viewname in (SELECT distinct dependee.relname FROM pg_depend JOIN pg_rewrite ON pg_depend.objid = pg_rewrite.oid JOIN pg_class as dependee ON pg_rewrite.ev_class = dependee.oid JOIN pg_class as dependent ON pg_depend.refobjid = dependent.oid JOIN pg_attribute ON pg_depend.refobjid = pg_attribute.attrelid AND pg_depend.refobjsubid = pg_attribute.attnum WHERE dependent.relname = '#{table_name}' AND pg_attribute.attnum > 0) SQL end
with_conn(&block)
click to toggle source
# File lib/table_copy/pg/destination.rb, line 144 def with_conn(&block) conn_method.call(&block) end