module Upsert::MergeFunction::Postgresql

@private

Attributes

quoted_selector_names[R]
quoted_setter_names[R]

Public Class Methods

extract_version(version_string) click to toggle source

Extracted from github.com/dr-itz/activerecord-jdbc-adapter/blob/master/lib/arjdbc/postgresql/adapter.rb

# File lib/upsert/merge_function/postgresql.rb, line 123
def self.extract_version(version_string)
  # Use the same versioning format as jdbc-postgresql and libpq
  # https://github.com/dr-itz/activerecord-jdbc-adapter/commit/fd79756374c62fa9d009995dd1914d780e6a3dbf
  # https://github.com/postgres/postgres/blob/master/src/interfaces/libpq/fe-exec.c
  if (match = version_string.match(/([\d\.]*\d).*?/))
    version = match[1].split('.').map(&:to_i)
    # PostgreSQL version representation does not have more than 4 digits
    # From version 10 onwards, PG has changed its versioning policy to
    # limit it to only 2 digits. i.e. in 10.x, 10 being the major
    # version and x representing the patch release
    # Refer to:
    #   https://www.postgresql.org/support/versioning/
    #   https://www.postgresql.org/docs/10/static/libpq-status.html -> PQserverVersion()
    # for more info

    if version.size >= 3
      (version[0] * 100 + version[1]) * 100 + version[2]
    elsif version.size == 2
      if version[0] >= 10
        version[0] * 100 * 100 + version[1]
      else
        (version[0] * 100 + version[1]) * 100
      end
    elsif version.size == 1
      version[0] * 100 * 100
    else
      0
    end
  else
    0
  end
end
included(klass) click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 5
def self.included(klass)
  klass.extend ClassMethods
end
new(controller, *args) click to toggle source
Calls superclass method
# File lib/upsert/merge_function/postgresql.rb, line 46
def initialize(controller, *args)
  super
  @quoted_setter_names = setter_keys.map { |k| connection.quote_ident k }
  @quoted_selector_names = selector_keys.map { |k| connection.quote_ident k }
end

Public Instance Methods

conflict_bind_placeholders(row) click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 236
def conflict_bind_placeholders(row)
  if row.hstore_delete_keys.empty?
    @conflict_bind_placeholders ||= setter_column_definitions.each_with_index.map do |column_definition, i|
      idx = i + 1
      if column_definition.hstore?
        "CASE WHEN #{quoted_table_name}.#{column_definition.quoted_name} IS NULL THEN CAST($#{idx} AS hstore) ELSE" \
          + " (#{quoted_table_name}.#{column_definition.quoted_name} || CAST($#{idx} AS hstore))" \
          + " END"
      else
        "$#{idx}"
      end
    end
  else
    setter_column_definitions.each_with_index.map do |column_definition, i|
      idx = i + 1
      if column_definition.hstore?
        "CASE WHEN #{quoted_table_name}.#{column_definition.quoted_name} IS NULL THEN " \
          + hstore_delete_function("CAST($#{idx} AS hstore)", row, column_definition) \
          + " ELSE " \
          + hstore_delete_function("(#{quoted_table_name}.#{column_definition.quoted_name} || CAST($#{idx} AS hstore))", row, column_definition) \
          + " END"
      else
        "$#{idx}"
      end
    end
  end
end
create!() click to toggle source

the “canonical example” from www.postgresql.org/docs/9.1/static/plpgsql-control-structures.html#PLPGSQL-UPSERT-EXAMPLE differentiate between selector and setter

# File lib/upsert/merge_function/postgresql.rb, line 317
def create!
  Upsert.logger.info "[upsert] Creating or replacing database function #{name.inspect} on table #{table_name.inspect} for selector #{selector_keys.map(&:inspect).join(', ')} and setter #{setter_keys.map(&:inspect).join(', ')}"
  first_try = true
  connection.execute(%{
    CREATE OR REPLACE FUNCTION #{name}(#{(selector_column_definitions.map(&:to_selector_arg) + setter_column_definitions.map(&:to_setter_arg) + hstore_delete_handlers.map(&:to_arg)).join(', ')}) RETURNS VOID AS
    $$
    DECLARE
      first_try INTEGER := 1;
    BEGIN
      LOOP
        -- first try to update the key
        UPDATE #{quoted_table_name} SET #{update_column_definitions.map(&:to_setter).join(', ')}
          WHERE #{selector_column_definitions.map(&:to_selector).join(' AND ') };
        IF found THEN
          #{hstore_delete_handlers.map(&:to_pgsql).join(' ')}
          RETURN;
        END IF;
        -- not there, so try to insert the key
        -- if someone else inserts the same key concurrently,
        -- we could get a unique-key failure
        BEGIN
          INSERT INTO #{quoted_table_name}(#{setter_column_definitions.map(&:quoted_name).join(', ')}) VALUES (#{setter_column_definitions.map(&:to_setter_value).join(', ')});
          #{hstore_delete_handlers.map(&:to_pgsql).join(' ')}
          RETURN;
        EXCEPTION WHEN unique_violation THEN
          -- seamusabshere 9/20/12 only retry once
          IF (first_try = 1) THEN
            first_try := 0;
          ELSE
            RETURN;
          END IF;
          -- Do nothing, and loop to try the UPDATE again.
        END;
      END LOOP;
    END;
    $$
    LANGUAGE plpgsql;
  })
rescue
  if first_try and $!.message =~ /tuple concurrently updated/
    first_try = false
    retry
  else
    raise $!
  end
end
execute(row) click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 52
def execute(row)
  use_pg_native? ? pg_native(row) : pg_function(row)
end
function_exists?() click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 87
def function_exists?
  @function_exists ||= controller.connection.execute("SELECT count(*) AS cnt FROM pg_proc WHERE lower(proname) = lower('#{name}')").first["cnt"].to_i > 0
end
hstore_delete_function(sql, row, column_definition) click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 201
def hstore_delete_function(sql, row, column_definition)
  parts = []
  if row.hstore_delete_keys.key?(column_definition.name)
    parts << "DELETE("
  end
  parts << sql
  if row.hstore_delete_keys.key?(column_definition.name)
    keys = row.hstore_delete_keys[column_definition.name].map { |k| "'#{k.to_s.gsub("'", "\\'")}'" }
    parts << ", ARRAY[#{keys.join(', ')}])"
  end

  parts.join(" ")
end
hstore_delete_handlers() click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 295
def hstore_delete_handlers
  @hstore_delete_handlers ||= setter_column_definitions.select do |column_definition|
    column_definition.hstore?
  end.map do |column_definition|
    HstoreDeleteHandler.new self, column_definition
  end
end
insert_bind_placeholders(row) click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 215
def insert_bind_placeholders(row)
  if row.hstore_delete_keys.empty?
    @insert_bind_placeholders ||= setter_column_definitions.each_with_index.map do |column_definition, i|
      if column_definition.hstore?
        "CAST($#{i + 1} AS hstore)"
      else
        "$#{i + 1}"
      end
    end
  else
    setter_column_definitions.each_with_index.map do |column_definition, i|
      idx = i + 1
      if column_definition.hstore?
        hstore_delete_function("CAST($#{idx} AS hstore)", row, column_definition)
      else
        "$#{idx}"
      end
    end
  end
end
pg_function(row) click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 56
def pg_function(row)
  values = []
  values += row.selector.values
  values += row.setter.values
  hstore_delete_handlers.each do |hstore_delete_handler|
    values << row.hstore_delete_keys.fetch(hstore_delete_handler.name, [])
  end
  Upsert.logger.debug do
    %{[upsert]\n\tSelector: #{row.selector.inspect}\n\tSetter: #{row.setter.inspect}}
  end

  first_try = true
  begin
    create! if !@assume_function_exists && (connection.in_transaction? && !function_exists?)
    execute_parameterized(sql, values.map { |v| connection.bind_value v })
  rescue self.class::ERROR_CLASS => pg_error
    if pg_error.message =~ /function #{name}.* does not exist/i
      if first_try
        Upsert.logger.info %{[upsert] Function #{name.inspect} went missing, trying to recreate}
        first_try = false
        create!
        retry
      end
      Upsert.logger.info %{[upsert] Failed to create function #{name.inspect} for some reason}
      raise pg_error
    else
      raise pg_error
    end
  end
end
pg_native(row) click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 186
def pg_native(row)
  bind_setter_values = row.setter.values.map { |v| connection.bind_value v }
  # TODO: Is this needed?
  row_syntax = server_version >= 100 ? "ROW" : ""

  upsert_sql = %{
    INSERT INTO #{quoted_table_name} (#{quoted_setter_names.join(',')})
    VALUES (#{insert_bind_placeholders(row).join(', ')})
    ON CONFLICT(#{quoted_selector_names.join(', ')})
    DO UPDATE SET #{quoted_setter_names.zip(conflict_bind_placeholders(row)).map { |n, v| "#{n} = #{v}" }.join(', ')}
  }

  execute_parameterized(upsert_sql, bind_setter_values)
end
selector_column_definitions() click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 303
def selector_column_definitions
  column_definitions.select { |cd| selector_keys.include?(cd.name) }
end
server_version() click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 116
def server_version
  @server_version ||= Upsert::MergeFunction::Postgresql.extract_version(
    controller.connection.execute("SHOW server_version").first["server_version"]
  )
end
setter_column_definitions() click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 307
def setter_column_definitions
  column_definitions.select { |cd| setter_keys.include?(cd.name) }
end
sql() click to toggle source

strangely ? can't be used as a placeholder

# File lib/upsert/merge_function/postgresql.rb, line 92
def sql
  @sql ||= begin
    bind_params = []
    i = 1
    (selector_keys.length + setter_keys.length).times do
      bind_params << "$#{i}"
      i += 1
    end
    hstore_delete_handlers.length.times do
      bind_params << "$#{i}::text[]"
      i += 1
    end
    %{SELECT #{name}(#{bind_params.join(', ')})}
  end
end
unique_index_columns() click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 156
def unique_index_columns
  if table_name.is_a?(Array) && table_name.length > 1
    schema_argument = '$2'
    table_name_arguments = table_name
  else
    schema_argument = 'ANY(current_schemas(true)::text[])'
    table_name_arguments = [*table_name]
  end

  table_name_arguments.reverse!

  execute_parameterized(
    %{
      SELECT
      ARRAY(
        SELECT pg_get_indexdef(pg_index.indexrelid, k + 1, TRUE)
        FROM
          generate_subscripts(pg_index.indkey, 1) AS k
        ORDER BY k
      ) AS index_columns
        FROM pg_index
        JOIN pg_class AS idx ON idx.oid = pg_index.indexrelid
        JOIN pg_class AS tbl ON tbl.oid = pg_index.indrelid
        JOIN pg_namespace ON pg_namespace.oid = idx.relnamespace
      WHERE pg_index.indisunique IS TRUE AND pg_namespace.nspname = #{schema_argument} AND tbl.relname = $1
    },
    table_name_arguments
  )
end
update_column_definitions() click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 311
def update_column_definitions
  setter_column_definitions.select { |cd| cd.name !~ CREATED_COL_REGEX }
end
use_pg_native?() click to toggle source
# File lib/upsert/merge_function/postgresql.rb, line 108
def use_pg_native?
  return @use_pg_native if defined?(@use_pg_native)

  @use_pg_native = server_version >= 90500 && unique_index_on_selector?
  Upsert.logger.warn "[upsert] WARNING: Not using native PG CONFLICT / UPDATE" unless @use_pg_native
  @use_pg_native
end