class ActiveRecord::ConnectionAdapters::MaterializeAdapter

The Materialize adapter works with the native C (bitbucket.org/ged/ruby-pg) driver.

Options:

Any further options are used as connection parameters to libpq. See www.postgresql.org/docs/current/static/libpq-connect.html for the list of parameters.

In addition, default connection parameters of libpq can be set per environment variables. See www.postgresql.org/docs/current/static/libpq-envars.html .

Constants

ADAPTER_NAME
CACHED_PLAN_HEURISTIC

Annoyingly, the code for prepared statements whose return value may have changed is FEATURE_NOT_SUPPORTED.

This covers various different error types so we need to do additional work to classify the exception definitively as a ActiveRecord::PreparedStatementCacheExpired

Check here for more details: git.postgresql.org/gitweb/?p=postgresql.git;a=blob;f=src/backend/utils/cache/plancache.c#l573

DEADLOCK_DETECTED
FOREIGN_KEY_VIOLATION
LOCK_NOT_AVAILABLE
NATIVE_DATABASE_TYPES
NOT_NULL_VIOLATION
NUMERIC_VALUE_OUT_OF_RANGE
QUERY_CANCELED
SERIALIZATION_FAILURE
UNIQUE_VIOLATION
VALUE_LIMIT_VIOLATION

See www.postgresql.org/docs/current/static/errcodes-appendix.html

Public Class Methods

create_unlogged_tables() click to toggle source

Materialize allows the creation of “unlogged” tables, which do not record data in the Materialize Write-Ahead Log. This can make the tables faster, but significantly increases the risk of data loss if the database crashes. As a result, this should not be used in production environments. If you would like all created tables to be unlogged in the test environment you can add the following line to your test.rb file:

ActiveRecord::ConnectionAdapters::MaterializeAdapter.create_unlogged_tables = true
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 108
class_attribute :create_unlogged_tables, default: false
database_exists?(config) click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 210
def self.database_exists?(config)
  !!ActiveRecord::Base.materialize_connection(config)
rescue ActiveRecord::NoDatabaseError
  false
end
new(connection, logger, connection_parameters, config) click to toggle source

Initializes and connects a Materialize adapter.

Calls superclass method
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 192
def initialize(connection, logger, connection_parameters, config)
  super(connection, logger, config)

  @connection_parameters = connection_parameters

  # @local_tz is initialized as nil to avoid warnings when connect tries to use it
  @local_tz = nil
  @max_identifier_length = nil

  configure_connection
  add_pg_encoders
  add_pg_decoders

  @type_map = Type::HashLookupTypeMap.new
  initialize_type_map
  @local_tz = execute("SHOW TIMEZONE", "SCHEMA").first["TimeZone"]
end

Public Instance Methods

active?() click to toggle source

Is this connection alive and ready for queries?

# File lib/active_record/connection_adapters/materialize_adapter.rb, line 217
def active?
  @lock.synchronize do
    @connection.query "SELECT 1"
  end
  true
rescue PG::Error
  false
end
disconnect!() click to toggle source

Disconnects from the database if already connected. Otherwise, this method does nothing.

Calls superclass method
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 251
def disconnect!
  @lock.synchronize do
    super
    @connection.close rescue nil
  end
end
max_identifier_length() click to toggle source

Returns the configured supported identifier length supported by Materialize

# File lib/active_record/connection_adapters/materialize_adapter.rb, line 319
def max_identifier_length
  @max_identifier_length || 64
end
reconnect!() click to toggle source

Close then reopen the connection.

Calls superclass method
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 227
def reconnect!
  @lock.synchronize do
    super
    @connection.reset
    configure_connection
  rescue PG::ConnectionBad
    connect
  end
end
reset!() click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 237
def reset!
  @lock.synchronize do
    clear_cache!
    reset_transaction
    unless @connection.transaction_status == ::PG::PQTRANS_IDLE
      @connection.query "ROLLBACK"
    end
    @connection.query "DISCARD ALL"
    configure_connection
  end
end
session_auth=(user) click to toggle source

Set the authorized user for this session

# File lib/active_record/connection_adapters/materialize_adapter.rb, line 324
def session_auth=(user)
  clear_cache!
  execute("SET SESSION AUTHORIZATION #{user}")
end
supports_advisory_locks?() click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 272
def supports_advisory_locks?
  true
end
supports_bulk_alter?() click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 141
def supports_bulk_alter?
  true
end
supports_common_table_expressions?() click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 296
def supports_common_table_expressions?
  true
end
supports_ddl_transactions?() click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 268
def supports_ddl_transactions?
  true
end
supports_explain?() click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 276
def supports_explain?
  true
end
supports_insert_returning?() click to toggle source

TODO: test insert returning

# File lib/active_record/connection_adapters/materialize_adapter.rb, line 159
def supports_insert_returning?
  true
end
supports_json?() click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 154
def supports_json?
  true
end
supports_lazy_transactions?() click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 300
def supports_lazy_transactions?
  true
end
supports_materialized_views?() click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 285
def supports_materialized_views?
  true
end
supports_optimizer_hints?() click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 289
def supports_optimizer_hints?
  unless defined?(@has_pg_hint_plan)
    @has_pg_hint_plan = extension_available?("pg_hint_plan")
  end
  @has_pg_hint_plan
end
supports_ranges?() click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 280
def supports_ranges?
  true
end
supports_transaction_isolation?() click to toggle source

TODO: test transaction isolation

# File lib/active_record/connection_adapters/materialize_adapter.rb, line 146
def supports_transaction_isolation?
  true
end
supports_views?() click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 150
def supports_views?
  true
end

Private Instance Methods

add_pg_decoders() click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 739
        def add_pg_decoders
          @default_timezone = nil
          @timestamp_decoder = nil

          coders_by_name = {
            "int2" => PG::TextDecoder::Integer,
            "int4" => PG::TextDecoder::Integer,
            "int8" => PG::TextDecoder::Integer,
            "integer" => PG::TextDecoder::Integer,
            "bigint" => PG::TextDecoder::Integer,
            "oid" => PG::TextDecoder::Integer,
            "float" => PG::TextDecoder::Float,
            "float4" => PG::TextDecoder::Float,
            "float8" => PG::TextDecoder::Float,
            "double" => PG::TextDecoder::Float,
            "bool" => PG::TextDecoder::Boolean,
            "boolean" => PG::TextDecoder::Boolean,
          }

          if defined?(PG::TextDecoder::TimestampUtc)
            # Use native PG encoders available since pg-1.1
            coders_by_name["timestamp"] = PG::TextDecoder::TimestampUtc
            coders_by_name["timestamptz"] = PG::TextDecoder::TimestampWithTimeZone
          end

          known_coder_types = coders_by_name.keys.map { |n| quote(n) }
          query = <<~SQL % known_coder_types.join(", ")
            SELECT t.oid, t.typname
            FROM pg_type as t
            WHERE t.typname IN (%s)
          SQL
          coders = execute_and_clear(query, "SCHEMA", []) do |result|
            result
              .map { |row| construct_coder(row, coders_by_name[row["typname"]]) }
              .compact
          end

          map = PG::TypeMapByOid.new
          coders.each { |coder| map.add_coder(coder) }
          @connection.type_map_for_results = map

          # extract timestamp decoder for use in update_typemap_for_default_timezone
          @timestamp_decoder = coders.find { |coder| coder.name == "timestamp" }
          update_typemap_for_default_timezone
        end
add_pg_encoders() click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 719
def add_pg_encoders
  map = PG::TypeMapByClass.new
  map[Integer] = PG::TextEncoder::Integer.new
  map[TrueClass] = PG::TextEncoder::Boolean.new
  map[FalseClass] = PG::TextEncoder::Boolean.new
  @connection.type_map_for_queries = map
end
arel_visitor() click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 689
def arel_visitor
  Arel::Visitors::PostgreSQL.new(self)
end
build_statement_pool() click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 693
def build_statement_pool
  StatementPool.new(@connection, self.class.type_cast_config_to_integer(@config[:statement_limit]))
end
can_perform_case_insensitive_comparison_for?(column) click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 697
        def can_perform_case_insensitive_comparison_for?(column)
          @case_insensitive_cache ||= {}
          @case_insensitive_cache[column.sql_type] ||= begin
            sql = <<~SQL
              SELECT exists(
                SELECT * FROM pg_proc
                WHERE proname = 'lower'
                  AND proargtypes = ARRAY[#{quote column.sql_type}::regtype]::oidvector
              ) OR exists(
                SELECT * FROM pg_proc
                INNER JOIN pg_cast
                  ON ARRAY[casttarget]::oidvector = proargtypes
                WHERE proname = 'lower'
                  AND castsource = #{quote column.sql_type}::regtype
              )
            SQL
            execute_and_clear(sql, "SCHEMA", []) do |result|
              result.getvalue(0, 0)
            end
          end
        end
column_definitions(table_name) click to toggle source

Returns the list of a table's column names, data types, and default values.

The underlying query is roughly:

SELECT column.name, column.type, default.value, column.comment
  FROM column LEFT JOIN default
    ON column.table_id = default.table_id
   AND column.num = default.column_num
 WHERE column.table_id = get_table_id('table_name')
   AND column.num > 0
   AND NOT column.is_dropped
 ORDER BY column.num

If the table name is not prefixed with a schema, the database will take the first match from the schema search path.

Query implementation notes:

- format_type includes the column size constraint, e.g. varchar(50)
- ::regclass is a function that gives the id for a table name
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 665
        def column_definitions(table_name)
          query(<<~SQL, "SCHEMA")
          SELECT
            c.name,
            c.type AS format_type,
            c.nullable,
            NULL AS default,
            NULL AS comment
          FROM mz_columns c
          LEFT JOIN mz_tables t ON c.id = t.id
          LEFT JOIN mz_schemas s ON t.schema_id = s.id
          LEFT JOIN mz_databases d ON s.database_id = d.id
          WHERE
            t.name = #{quote(table_name)}
            AND d.name = #{quote(current_database)}
            ORDER BY c.position
          SQL
        end
configure_connection() click to toggle source

Configures the encoding, verbosity, schema search path, and time zone of the connection. This is called by connect and should not be called manually.

# File lib/active_record/connection_adapters/materialize_adapter.rb, line 618
def configure_connection
  if @config[:encoding]
    @connection.set_client_encoding(@config[:encoding])
  end

  variables = @config.fetch(:variables, {}).stringify_keys

  # If using Active Record's time zone support configure the connection to return
  # TIMESTAMP WITH ZONE types in UTC.
  unless variables["timezone"]
    if ActiveRecord::Base.default_timezone == :utc
      variables["timezone"] = "UTC"
    elsif @local_tz
      variables["timezone"] = @local_tz
    end
  end

  # SET statements from :variables config hash
  # https://www.postgresql.org/docs/current/static/sql-set.html
  variables.map do |k, v|
    if v == ":default" || v == :default
      # Sets the value to the global or compile default
      execute("SET SESSION #{k} TO DEFAULT", "SCHEMA")
    elsif !v.nil?
      execute("SET SESSION #{k} TO #{quote(v)}", "SCHEMA")
    end
  end
end
connect() click to toggle source

Connects to a Materialize server and sets up the adapter depending on the connected server's characteristics.

# File lib/active_record/connection_adapters/materialize_adapter.rb, line 609
def connect
  @connection = PG.connect(@connection_parameters)
  configure_connection
  add_pg_encoders
  add_pg_decoders
end
construct_coder(row, coder_class) click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 785
def construct_coder(row, coder_class)
  return unless coder_class
  coder_class.new(oid: row["oid"].to_i, name: row["typname"])
end
exec_no_cache(sql, name, binds) click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 531
def exec_no_cache(sql, name, binds)
  materialize_transactions

  # make sure we carry over any changes to ActiveRecord::Base.default_timezone that have been
  # made since we established the connection
  update_typemap_for_default_timezone

  typed_binds = type_casted_binds(binds)
  log(sql, name, binds, typed_binds) do
    ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
      @connection.exec_params(prepare_statement(sql, typed_binds), [])
    end
  end
rescue ActiveRecord::StatementInvalid,
       PG::InternalError => error
  if error.message.include? "At least one input has no complete timestamps yet"
    raise ::Materialize::Errors::IncompleteInput, error.message
  else
    raise
  end
end
execute_and_clear(sql, name, binds, prepare: false) { |result| ... } click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 516
def execute_and_clear(sql, name, binds, prepare: false)
  if preventing_writes? && write_query?(sql)
    raise ActiveRecord::ReadOnlyError, "Write query attempted while in readonly mode: #{sql}"
  end

  if without_prepared_statement?(binds)
    result = exec_no_cache(sql, name, [])
  else
    result = exec_no_cache(sql, name, binds)
  end
  ret = yield result
  result.clear
  ret
end
extract_default_function(default_value, default) click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 506
def extract_default_function(default_value, default)
  default if has_default_function?(default_value, default)
end
extract_table_ref_from_insert_sql(sql) click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 684
def extract_table_ref_from_insert_sql(sql)
  sql[/into\s("[A-Za-z0-9_."\[\]\s]+"|[A-Za-z0-9_."\[\]]+)\s*/im]
  $1.strip if $1
end
extract_value_from_default(default) click to toggle source

Extracts the value from a Materialize column default definition.

# File lib/active_record/connection_adapters/materialize_adapter.rb, line 480
def extract_value_from_default(default)
  case default
    # Quoted types
  when /\A[\(B]?'(.*)'.*::"?([\w. ]+)"?(?:\[\])?\z/m
    # The default 'now'::date is CURRENT_DATE
    if $1 == "now" && $2 == "date"
      nil
    else
      $1.gsub("''", "'")
    end
    # Boolean types
  when "true", "false"
    default
    # Numeric types
  when /\A\(?(-?\d+(\.\d*)?)\)?(::bigint)?\z/
    $1
    # Object identifier types
  when /\A-?\d+\z/
    $1
  else
    # Anything else is blank, some user type, or some function
    # and we can't know the value of that, so return nil.
    nil
  end
end
get_oid_type(oid, fmod, column_name, sql_type = "") click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 408
def get_oid_type(oid, fmod, column_name, sql_type = "")
  if !type_map.key?(oid)
    load_additional_types([oid])
  end

  type_map.fetch(oid, fmod, sql_type) {
    warn "unknown OID #{oid}: failed to recognize type of '#{column_name}'. It will be treated as String."
    Type.default_value.tap do |cast_type|
      type_map.register_type(oid, cast_type)
    end
  }
end
has_default_function?(default_value, default) click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 510
def has_default_function?(default_value, default)
  !default_value && %r{\w+\(.*\)|\(.*\)::\w+|CURRENT_DATE|CURRENT_TIMESTAMP}.match?(default)
end
in_transaction?() click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 571
def in_transaction?
  open_transactions > 0
end
initialize_type_map(m = type_map) click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 444
def initialize_type_map(m = type_map)
  m.register_type "int2", Type::Integer.new(limit: 2)
  m.register_type "int4", Type::Integer.new(limit: 4)
  m.register_type "int8", Type::Integer.new(limit: 8)
  m.alias_type "int", "int4"
  m.alias_type "integer", "int4"
  m.alias_type "bigint", "int8"
  m.register_type "oid", OID::Oid.new
  m.register_type "float4", Type::Float.new
  m.alias_type "float8", "float4"
  m.alias_type "double", "float4"
  m.register_type "text", Type::Text.new
  m.alias_type "varchar", "text"

  m.register_type "boolean", Type::Boolean.new
  register_class_with_limit m, "bytea", OID::BitVarying

  m.alias_type "timestamptz", "timestamp"
  m.register_type "date", OID::Date.new

  m.register_type "bytea", OID::Bytea.new
  m.register_type "jsonb", OID::Jsonb.new
  m.alias_type "json", "jsonb"

  m.register_type "interval" do |_, _, sql_type|
    precision = extract_precision(sql_type)
    OID::SpecializedString.new(:interval, precision: precision)
  end

  register_class_with_precision m, "time", Type::Time
  register_class_with_precision m, "timestamp", OID::DateTime

  load_additional_types
end
is_cached_plan_failure?(e) click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 563
def is_cached_plan_failure?(e)
  pgerror = e.cause
  code = pgerror.result.result_error_field(PG::PG_DIAG_SQLSTATE)
  code == FEATURE_NOT_SUPPORTED && pgerror.message.include?(CACHED_PLAN_HEURISTIC)
rescue
  false
end
load_additional_types(oids = nil) click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 421
        def load_additional_types(oids = nil)
          initializer = OID::TypeMapInitializer.new(type_map)

          query = <<~SQL
            SELECT t.id, t.oid, t.schema_id, t.name
            FROM mz_types as t
            LEFT JOIN mz_schemas s ON t.schema_id = s.id
            LEFT JOIN mz_databases d ON s.database_id = d.id
            WHERE
              (d.name = #{quote(current_database)} OR s.name = 'pg_catalog')
          SQL

          if oids
            query += "AND t.oid IN (%s)" % oids.join(", ")
          else
            query += initializer.query_conditions_for_initial_load
          end

          execute_and_clear(query, "SCHEMA", []) do |records|
            initializer.run(records)
          end
        end
prepare_statement(sql, binds) click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 581
def prepare_statement(sql, binds)
  parametrized_statement = sql.split(/\s+/)
  binds.each_with_index do |n, i|
    index = i + 1
    part = \
      case n
      when String
        "'#{quote_string(n)}'"
      when Numeric
        n.to_s
      end

    parametrized_statement = parametrized_statement.map do |token|
      token
        .gsub("$#{index}", part)
        .gsub("($#{index}", "(#{part}")
        .gsub("$#{index})", "#{part})")
        .gsub("($#{index})", "(#{part})")
        .gsub(",$#{index}", ",#{part}")
        .gsub("$#{index},", "#{part},")
        .gsub(",$#{index},", ",#{part},")
    end
  end
  parametrized_statement.join ' '
end
sql_key(sql) click to toggle source

Returns the statement identifier for the client side cache of statements

# File lib/active_record/connection_adapters/materialize_adapter.rb, line 577
def sql_key(sql)
  "#{sql}"
end
translate_exception(exception, message:, sql:, binds:) click to toggle source
Calls superclass method
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 381
def translate_exception(exception, message:, sql:, binds:)
  return exception unless exception.respond_to?(:result)

  case exception.result.try(:error_field, PG::PG_DIAG_SQLSTATE)
  when UNIQUE_VIOLATION
    RecordNotUnique.new(message, sql: sql, binds: binds)
  when FOREIGN_KEY_VIOLATION
    InvalidForeignKey.new(message, sql: sql, binds: binds)
  when VALUE_LIMIT_VIOLATION
    ValueTooLong.new(message, sql: sql, binds: binds)
  when NUMERIC_VALUE_OUT_OF_RANGE
    RangeError.new(message, sql: sql, binds: binds)
  when NOT_NULL_VIOLATION
    NotNullViolation.new(message, sql: sql, binds: binds)
  when SERIALIZATION_FAILURE
    SerializationFailure.new(message, sql: sql, binds: binds)
  when DEADLOCK_DETECTED
    Deadlocked.new(message, sql: sql, binds: binds)
  when LOCK_NOT_AVAILABLE
    LockWaitTimeout.new(message, sql: sql, binds: binds)
  when QUERY_CANCELED
    QueryCanceled.new(message, sql: sql, binds: binds)
  else
    super
  end
end
update_typemap_for_default_timezone() click to toggle source
# File lib/active_record/connection_adapters/materialize_adapter.rb, line 727
def update_typemap_for_default_timezone
  if @default_timezone != ActiveRecord::Base.default_timezone && @timestamp_decoder
    decoder_class = ActiveRecord::Base.default_timezone == :utc ?
      PG::TextDecoder::TimestampUtc :
      PG::TextDecoder::TimestampWithoutTimeZone

    @timestamp_decoder = decoder_class.new(@timestamp_decoder.to_h)
    @connection.type_map_for_results.add_coder(@timestamp_decoder)
    @default_timezone = ActiveRecord::Base.default_timezone
  end
end