# File lib/postgres_upsert/writer.rb, line 73 def primary_key @destination.primary_key end
class PostgresUpsert::Writer
Public Class Methods
new(klass, destination, source, options = {})
click to toggle source
# File lib/postgres_upsert/writer.rb, line 4 def initialize(klass, destination, source, options = {}) @klass = klass @destination = destination @source = source @options = options.reverse_merge({ delimiter: ',', header: true, unique_key: [primary_key], update_only: false }) @source = source @options[:unique_key] = Array.wrap(@options[:unique_key]) end
Public Instance Methods
write()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 19 def write validate_options create_temp_table if @source.continuous_write_enabled write_continuous else write_batched end upsert_from_temp_table drop_temp_table summarize_results end
Private Instance Methods
columns_string_for_copy()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 89 def columns_string_for_copy str = get_columns_string str.empty? ? str : "(#{str})" end
columns_string_for_insert()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 101 def columns_string_for_insert columns = source_columns.clone columns << 'created_at' if inject_create_timestamp? columns << 'updated_at' if inject_update_timestamp? get_columns_string(columns) end
columns_string_for_select()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 94 def columns_string_for_select columns = source_columns.clone columns << 'created_at' if inject_create_timestamp? columns << 'updated_at' if inject_update_timestamp? get_columns_string(columns) end
create_temp_table()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 188 def create_temp_table generate_temp_table_name database_connection.execute <<-SQL SET client_min_messages=WARNING; DROP TABLE IF EXISTS #{@temp_table_name}; CREATE TEMP TABLE #{@temp_table_name} AS SELECT #{select_string_for_create} FROM #{quoted_table_name} WHERE 0 = 1; SQL end
database_connection()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 58 def database_connection @destination.database_connection end
destination_columns()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 77 def destination_columns @destination.column_names end
drop_temp_table()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 211 def drop_temp_table database_connection.execute <<-SQL DROP TABLE #{@temp_table_name} SQL end
generate_temp_table_name()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 137 def generate_temp_table_name @temp_table_name ||= "#{@table_name}_temp_#{rand(1000)}" end
get_columns_string(columns = nil)
click to toggle source
# File lib/postgres_upsert/writer.rb, line 132 def get_columns_string(columns = nil) columns ||= source_columns !columns.empty? ? "\"#{columns.join('","')}\"" : '' end
inject_create_timestamp?()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 116 def inject_create_timestamp? destination_columns.include?('created_at') && !source_columns.include?('created_at') end
inject_update_timestamp?()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 120 def inject_update_timestamp? destination_columns.include?('updated_at') && !source_columns.include?('updated_at') end
insert_from_temp_table()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 166 def insert_from_temp_table columns_string = columns_string_for_insert select_string = select_string_for_insert @insert_result = database_connection.execute <<-SQL INSERT INTO #{quoted_table_name} (#{columns_string}) SELECT #{select_string} FROM #{@temp_table_name} as t WHERE NOT EXISTS (SELECT 1 FROM #{quoted_table_name} as d WHERE #{unique_key_select('t', 'd')}); SQL end
primary_key()
click to toggle source
quoted_table_name()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 81 def quoted_table_name @destination.quoted_table_name end
select_string_for_create()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 124 def select_string_for_create columns = source_columns.map(&:to_sym) @options[:unique_key].each do |key_component| columns << key_component.to_sym unless columns.include?(key_component.to_sym) end get_columns_string(columns) end
select_string_for_insert()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 108 def select_string_for_insert columns = source_columns.clone str = get_columns_string(columns) str << ",'#{DateTime.now.utc}'" if inject_create_timestamp? str << ",'#{DateTime.now.utc}'" if inject_update_timestamp? str end
source_columns()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 85 def source_columns @source.columns end
summarize_results()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 62 def summarize_results result = PostgresUpsert::Result.new(@insert_result, @update_result, @copy_result) expected_rows = @options[:update_only] ? result.updated_rows : result.copied_rows if result.changed_rows != expected_rows raise "#{expected_rows} rows were copied, but #{result.changed_rows} were upserted to destination table. Check to make sure your key is unique." end result end
unique_key_present(source)
click to toggle source
# File lib/postgres_upsert/writer.rb, line 184 def unique_key_present(source) @options[:unique_key].map { |field| "#{source}.#{field} IS NOT NULL" }.join(' AND ') end
unique_key_select(source, dest)
click to toggle source
# File lib/postgres_upsert/writer.rb, line 180 def unique_key_select(source, dest) @options[:unique_key].map { |field| "#{source}.#{field} = #{dest}.#{field}" }.join(' AND ') end
update_from_temp_table()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 146 def update_from_temp_table @update_result = database_connection.execute <<-SQL UPDATE #{quoted_table_name} AS d #{update_set_clause} FROM #{@temp_table_name} as t WHERE #{unique_key_select('t', 'd')} AND #{unique_key_present('d')} SQL end
update_set_clause()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 156 def update_set_clause command = source_columns.map do |col| "\"#{col}\" = t.\"#{col}\"" end unless source_columns.include?('updated_at') command << "\"updated_at\" = '#{DateTime.now.utc}'" if destination_columns.include?('updated_at') end "SET #{command.join(',')}" end
upsert_from_temp_table()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 141 def upsert_from_temp_table update_from_temp_table insert_from_temp_table unless @options[:update_only] end
validate_options()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 199 def validate_options if source_columns.empty? raise 'Either the :columns option or :header => true are required' end @options[:unique_key].each do |key_component| unless source_columns.include?(key_component.to_s) raise "Expected column '#{key_component}' was not found in source" end end end
write_batched()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 50 def write_batched @source.gets do |line| @copy_result = database_connection.raw_connection.copy_data %{COPY #{@temp_table_name} #{columns_string_for_copy} FROM STDIN} do database_connection.raw_connection.put_copy_data line end end end
write_continuous()
click to toggle source
# File lib/postgres_upsert/writer.rb, line 39 def write_continuous csv_options = "DELIMITER '#{@options[:delimiter]}' CSV" @copy_result = database_connection.raw_connection.copy_data %{COPY #{@temp_table_name} #{columns_string_for_copy} FROM STDIN #{csv_options}} do while (line = @source.gets) next if line.strip.empty? database_connection.raw_connection.put_copy_data line end end end