module Upsert::MergeFunction::Postgresql
@private
Attributes
quoted_selector_names[R]
quoted_setter_names[R]
Public Class Methods
extract_version(version_string)
click to toggle source
Extracted from github.com/dr-itz/activerecord-jdbc-adapter/blob/master/lib/arjdbc/postgresql/adapter.rb
# File lib/upsert/merge_function/postgresql.rb, line 123 def self.extract_version(version_string) # Use the same versioning format as jdbc-postgresql and libpq # https://github.com/dr-itz/activerecord-jdbc-adapter/commit/fd79756374c62fa9d009995dd1914d780e6a3dbf # https://github.com/postgres/postgres/blob/master/src/interfaces/libpq/fe-exec.c if (match = version_string.match(/([\d\.]*\d).*?/)) version = match[1].split('.').map(&:to_i) # PostgreSQL version representation does not have more than 4 digits # From version 10 onwards, PG has changed its versioning policy to # limit it to only 2 digits. i.e. in 10.x, 10 being the major # version and x representing the patch release # Refer to: # https://www.postgresql.org/support/versioning/ # https://www.postgresql.org/docs/10/static/libpq-status.html -> PQserverVersion() # for more info if version.size >= 3 (version[0] * 100 + version[1]) * 100 + version[2] elsif version.size == 2 if version[0] >= 10 version[0] * 100 * 100 + version[1] else (version[0] * 100 + version[1]) * 100 end elsif version.size == 1 version[0] * 100 * 100 else 0 end else 0 end end
included(klass)
click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 5 def self.included(klass) klass.extend ClassMethods end
new(controller, *args)
click to toggle source
Calls superclass method
# File lib/upsert/merge_function/postgresql.rb, line 46 def initialize(controller, *args) super @quoted_setter_names = setter_keys.map { |k| connection.quote_ident k } @quoted_selector_names = selector_keys.map { |k| connection.quote_ident k } end
Public Instance Methods
conflict_bind_placeholders(row)
click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 236 def conflict_bind_placeholders(row) if row.hstore_delete_keys.empty? @conflict_bind_placeholders ||= setter_column_definitions.each_with_index.map do |column_definition, i| idx = i + 1 if column_definition.hstore? "CASE WHEN #{quoted_table_name}.#{column_definition.quoted_name} IS NULL THEN CAST($#{idx} AS hstore) ELSE" \ + " (#{quoted_table_name}.#{column_definition.quoted_name} || CAST($#{idx} AS hstore))" \ + " END" else "$#{idx}" end end else setter_column_definitions.each_with_index.map do |column_definition, i| idx = i + 1 if column_definition.hstore? "CASE WHEN #{quoted_table_name}.#{column_definition.quoted_name} IS NULL THEN " \ + hstore_delete_function("CAST($#{idx} AS hstore)", row, column_definition) \ + " ELSE " \ + hstore_delete_function("(#{quoted_table_name}.#{column_definition.quoted_name} || CAST($#{idx} AS hstore))", row, column_definition) \ + " END" else "$#{idx}" end end end end
create!()
click to toggle source
the “canonical example” from www.postgresql.org/docs/9.1/static/plpgsql-control-structures.html#PLPGSQL-UPSERT-EXAMPLE differentiate between selector and setter
# File lib/upsert/merge_function/postgresql.rb, line 317 def create! Upsert.logger.info "[upsert] Creating or replacing database function #{name.inspect} on table #{table_name.inspect} for selector #{selector_keys.map(&:inspect).join(', ')} and setter #{setter_keys.map(&:inspect).join(', ')}" first_try = true connection.execute(%{ CREATE OR REPLACE FUNCTION #{name}(#{(selector_column_definitions.map(&:to_selector_arg) + setter_column_definitions.map(&:to_setter_arg) + hstore_delete_handlers.map(&:to_arg)).join(', ')}) RETURNS VOID AS $$ DECLARE first_try INTEGER := 1; BEGIN LOOP -- first try to update the key UPDATE #{quoted_table_name} SET #{update_column_definitions.map(&:to_setter).join(', ')} WHERE #{selector_column_definitions.map(&:to_selector).join(' AND ') }; IF found THEN #{hstore_delete_handlers.map(&:to_pgsql).join(' ')} RETURN; END IF; -- not there, so try to insert the key -- if someone else inserts the same key concurrently, -- we could get a unique-key failure BEGIN INSERT INTO #{quoted_table_name}(#{setter_column_definitions.map(&:quoted_name).join(', ')}) VALUES (#{setter_column_definitions.map(&:to_setter_value).join(', ')}); #{hstore_delete_handlers.map(&:to_pgsql).join(' ')} RETURN; EXCEPTION WHEN unique_violation THEN -- seamusabshere 9/20/12 only retry once IF (first_try = 1) THEN first_try := 0; ELSE RETURN; END IF; -- Do nothing, and loop to try the UPDATE again. END; END LOOP; END; $$ LANGUAGE plpgsql; }) rescue if first_try and $!.message =~ /tuple concurrently updated/ first_try = false retry else raise $! end end
execute(row)
click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 52 def execute(row) use_pg_native? ? pg_native(row) : pg_function(row) end
function_exists?()
click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 87 def function_exists? @function_exists ||= controller.connection.execute("SELECT count(*) AS cnt FROM pg_proc WHERE lower(proname) = lower('#{name}')").first["cnt"].to_i > 0 end
hstore_delete_function(sql, row, column_definition)
click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 201 def hstore_delete_function(sql, row, column_definition) parts = [] if row.hstore_delete_keys.key?(column_definition.name) parts << "DELETE(" end parts << sql if row.hstore_delete_keys.key?(column_definition.name) keys = row.hstore_delete_keys[column_definition.name].map { |k| "'#{k.to_s.gsub("'", "\\'")}'" } parts << ", ARRAY[#{keys.join(', ')}])" end parts.join(" ") end
hstore_delete_handlers()
click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 295 def hstore_delete_handlers @hstore_delete_handlers ||= setter_column_definitions.select do |column_definition| column_definition.hstore? end.map do |column_definition| HstoreDeleteHandler.new self, column_definition end end
insert_bind_placeholders(row)
click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 215 def insert_bind_placeholders(row) if row.hstore_delete_keys.empty? @insert_bind_placeholders ||= setter_column_definitions.each_with_index.map do |column_definition, i| if column_definition.hstore? "CAST($#{i + 1} AS hstore)" else "$#{i + 1}" end end else setter_column_definitions.each_with_index.map do |column_definition, i| idx = i + 1 if column_definition.hstore? hstore_delete_function("CAST($#{idx} AS hstore)", row, column_definition) else "$#{idx}" end end end end
pg_function(row)
click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 56 def pg_function(row) values = [] values += row.selector.values values += row.setter.values hstore_delete_handlers.each do |hstore_delete_handler| values << row.hstore_delete_keys.fetch(hstore_delete_handler.name, []) end Upsert.logger.debug do %{[upsert]\n\tSelector: #{row.selector.inspect}\n\tSetter: #{row.setter.inspect}} end first_try = true begin create! if !@assume_function_exists && (connection.in_transaction? && !function_exists?) execute_parameterized(sql, values.map { |v| connection.bind_value v }) rescue self.class::ERROR_CLASS => pg_error if pg_error.message =~ /function #{name}.* does not exist/i if first_try Upsert.logger.info %{[upsert] Function #{name.inspect} went missing, trying to recreate} first_try = false create! retry end Upsert.logger.info %{[upsert] Failed to create function #{name.inspect} for some reason} raise pg_error else raise pg_error end end end
pg_native(row)
click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 186 def pg_native(row) bind_setter_values = row.setter.values.map { |v| connection.bind_value v } # TODO: Is this needed? row_syntax = server_version >= 100 ? "ROW" : "" upsert_sql = %{ INSERT INTO #{quoted_table_name} (#{quoted_setter_names.join(',')}) VALUES (#{insert_bind_placeholders(row).join(', ')}) ON CONFLICT(#{quoted_selector_names.join(', ')}) DO UPDATE SET #{quoted_setter_names.zip(conflict_bind_placeholders(row)).map { |n, v| "#{n} = #{v}" }.join(', ')} } execute_parameterized(upsert_sql, bind_setter_values) end
selector_column_definitions()
click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 303 def selector_column_definitions column_definitions.select { |cd| selector_keys.include?(cd.name) } end
server_version()
click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 116 def server_version @server_version ||= Upsert::MergeFunction::Postgresql.extract_version( controller.connection.execute("SHOW server_version").first["server_version"] ) end
setter_column_definitions()
click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 307 def setter_column_definitions column_definitions.select { |cd| setter_keys.include?(cd.name) } end
sql()
click to toggle source
strangely ? can't be used as a placeholder
# File lib/upsert/merge_function/postgresql.rb, line 92 def sql @sql ||= begin bind_params = [] i = 1 (selector_keys.length + setter_keys.length).times do bind_params << "$#{i}" i += 1 end hstore_delete_handlers.length.times do bind_params << "$#{i}::text[]" i += 1 end %{SELECT #{name}(#{bind_params.join(', ')})} end end
unique_index_columns()
click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 156 def unique_index_columns if table_name.is_a?(Array) && table_name.length > 1 schema_argument = '$2' table_name_arguments = table_name else schema_argument = 'ANY(current_schemas(true)::text[])' table_name_arguments = [*table_name] end table_name_arguments.reverse! execute_parameterized( %{ SELECT ARRAY( SELECT pg_get_indexdef(pg_index.indexrelid, k + 1, TRUE) FROM generate_subscripts(pg_index.indkey, 1) AS k ORDER BY k ) AS index_columns FROM pg_index JOIN pg_class AS idx ON idx.oid = pg_index.indexrelid JOIN pg_class AS tbl ON tbl.oid = pg_index.indrelid JOIN pg_namespace ON pg_namespace.oid = idx.relnamespace WHERE pg_index.indisunique IS TRUE AND pg_namespace.nspname = #{schema_argument} AND tbl.relname = $1 }, table_name_arguments ) end
update_column_definitions()
click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 311 def update_column_definitions setter_column_definitions.select { |cd| cd.name !~ CREATED_COL_REGEX } end
use_pg_native?()
click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 108 def use_pg_native? return @use_pg_native if defined?(@use_pg_native) @use_pg_native = server_version >= 90500 && unique_index_on_selector? Upsert.logger.warn "[upsert] WARNING: Not using native PG CONFLICT / UPDATE" unless @use_pg_native @use_pg_native end