module ActiveRecord::ConnectionAdapters::Spanner::DatabaseStatements

Constants

COMMENT_REGEX
DDL_REGX
DML_REGX

Public Instance Methods

begin_db_transaction() click to toggle source
# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 153
def begin_db_transaction
  log "BEGIN" do
    @connection.begin_transaction
  end
end
begin_isolated_db_transaction(isolation) click to toggle source

Begins a transaction on the database with the specified isolation level. Cloud Spanner only supports isolation level :serializable, but also defines three additional 'isolation levels' that can be used to start specific types of Spanner transactions:

  • :read_only: Starts a read-only snapshot transaction using a strong timestamp bound.

  • :buffered_mutations: Starts a read/write transaction that will use mutations instead of DML for single-row

    inserts/updates/deletes. Mutations are buffered locally until the transaction is
    committed, and any changes during a transaction cannot be read by the application.
  • :pdml: Starts a Partitioned DML transaction. Executing multiple DML statements in one PDML transaction

    block is NOT supported A PDML transaction is not guaranteed to be atomic.
    See https://cloud.google.com/spanner/docs/dml-partitioned for more information.

In addition to the above, a Hash containing read-only snapshot options may be used to start a specific read-only snapshot:

  • { timestamp: Time } Starts a read-only snapshot at the given timestamp.

  • { staleness: Integer } Starts a read-only snapshot with the given staleness in seconds.

  • { strong: <any value>} Starts a read-only snapshot with strong timestamp bound

    (this is the same as :read_only)
    
# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 177
def begin_isolated_db_transaction isolation
  if isolation.is_a? Hash
    raise "Unsupported isolation level: #{isolation}" unless \
      isolation[:timestamp] || isolation[:staleness] || isolation[:strong]
    raise "Only one option is supported. It must be one of `timestamp`, `staleness` or `strong`." \
      if isolation.count != 1
  else
    raise "Unsupported isolation level: #{isolation}" unless \
      [:serializable, :read_only, :buffered_mutations, :pdml].include? isolation
  end

  log "BEGIN #{isolation}" do
    @connection.begin_transaction isolation
  end
end
commit_db_transaction() click to toggle source
# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 193
def commit_db_transaction
  log "COMMIT" do
    @connection.commit_transaction
  end
end
delete(arel, name = nil, binds = [])
Alias for: update
exec_delete(sql, name = "SQL", binds = [])
Alias for: exec_update
exec_mutation(mutation) click to toggle source
# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 64
def exec_mutation mutation
  @connection.current_transaction.buffer mutation
end
exec_query(sql, name = "SQL", binds = [], prepare: false result = execute sql, name, binds) click to toggle source
# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 57
def exec_query sql, name = "SQL", binds = [], prepare: false # rubocop:disable Lint/UnusedMethodArgument
  result = execute sql, name, binds
  ActiveRecord::Result.new(
    result.fields.keys.map(&:to_s), result.rows.map(&:values)
  )
end
exec_update(sql, name = "SQL", binds = []) click to toggle source
# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 82
def exec_update sql, name = "SQL", binds = []
  result = execute sql, name, binds
  # Make sure that we consume the entire result stream before trying to get the stats.
  # This is required because the ExecuteStreamingSql RPC is also used for (Partitioned) DML,
  # and this RPC can return multiple partial result sets for DML as well. Only the last partial
  # result set will contain the statistics. Although there will never be any rows, this makes
  # sure that the stream is fully consumed.
  result.rows.each { |_| }
  return result.row_count if result.row_count

  raise ActiveRecord::StatementInvalid.new(
    "DML statement is invalid.", sql: sql
  )
end
Also aliased as: exec_delete
execute(sql, name = nil, binds = []) click to toggle source

DDL, DML and DQL Statements

# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 15
def execute sql, name = nil, binds = []
  statement_type = sql_statement_type sql

  if preventing_writes? && [:dml, :ddl].include?(statement_type)
    raise ActiveRecord::ReadOnlyError(
      "Write query attempted while in readonly mode: #{sql}"
    )
  end

  if statement_type == :ddl
    execute_ddl sql
  else
    transaction_required = statement_type == :dml
    materialize_transactions

    # First process and remove any hints in the binds that indicate that
    # a different read staleness should be used than the default.
    staleness_hint = binds.find { |b| b.is_a? Arel::Visitors::StalenessHint }
    if staleness_hint
      selector = Google::Cloud::Spanner::Session.single_use_transaction staleness_hint.value
      binds.delete staleness_hint
    end

    log sql, name do
      types, params = to_types_and_params binds
      ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
        if transaction_required
          transaction do
            @connection.execute_query sql, params: params, types: types
          end
        else
          @connection.execute_query sql, params: params, types: types, single_use_selector: selector
        end
      end
    end
  end
end
execute_ddl(statements) click to toggle source
# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 110
def execute_ddl statements
  log "MIGRATION", "SCHEMA" do
    ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
      @connection.execute_ddl statements
    end
  end
rescue Google::Cloud::Error => error
  raise ActiveRecord::StatementInvalid, error
end
query(sql, name = nil) click to toggle source
# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 53
def query sql, name = nil
  exec_query sql, name
end
rollback_db_transaction() click to toggle source
# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 199
def rollback_db_transaction
  log "ROLLBACK" do
    @connection.rollback_transaction
  end
end
transaction(requires_new: nil, isolation: nil, joinable: true) click to toggle source

Transaction

Calls superclass method
# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 122
def transaction requires_new: nil, isolation: nil, joinable: true
  if !requires_new && current_transaction.joinable?
    return super
  end

  backoff = 0.2
  begin
    super
  rescue ActiveRecord::StatementInvalid => err
    if err.cause.is_a? Google::Cloud::AbortedError
      sleep(delay_from_aborted(err) || backoff *= 1.3)
      retry
    end
    raise
  end
end
transaction_isolation_levels() click to toggle source
# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 139
def transaction_isolation_levels
  {
    read_uncommitted:   "READ UNCOMMITTED",
    read_committed:     "READ COMMITTED",
    repeatable_read:    "REPEATABLE READ",
    serializable:       "SERIALIZABLE",

    # These are not really isolation levels, but it is the only (best) way to pass in additional
    # transaction options to the connection.
    read_only:          "READ_ONLY",
    buffered_mutations: "BUFFERED_MUTATIONS"
  }
end
truncate(table_name, name = nil) click to toggle source
# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 98
def truncate table_name, name = nil
  Array(table_name).each do |t|
    log "TRUNCATE #{t}", name do
      @connection.truncate t
    end
  end
end
update(arel, name = nil, binds = []) click to toggle source
Calls superclass method
# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 68
def update arel, name = nil, binds = []
  # Add a `WHERE TRUE` if it is an update_all or delete_all call that uses DML.
  if !should_use_mutation(arel) && arel.respond_to?(:ast) && arel.ast.wheres.empty?
    arel.ast.wheres << Arel::Nodes::SqlLiteral.new("TRUE")
  end
  return super unless should_use_mutation arel

  raise "Unsupported update for use with mutations: #{arel}" unless arel.is_a? Arel::DeleteManager

  exec_mutation create_delete_all_mutation arel if arel.is_a? Arel::DeleteManager
  0 # Affected rows (unknown)
end
Also aliased as: delete
write_query?(sql) click to toggle source
# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 106
def write_query? sql
  sql_statement_type(sql) == :dml
end

Private Instance Methods

can_use_mutation(arel) click to toggle source
# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 240
def can_use_mutation arel
  return true if arel.is_a?(Arel::DeleteManager) && arel.respond_to?(:ast) && arel.ast.wheres.empty?
  false
end
create_delete_all_mutation(arel) click to toggle source
# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 245
def create_delete_all_mutation arel
  unless arel.is_a? Arel::DeleteManager
    raise "A delete mutation can only be created from a DeleteManager"
  end
  # Check if it is a delete_all operation.
  unless arel.ast.wheres.empty?
    raise "A delete mutation can only be created without a WHERE clause"
  end
  table_name = arel.ast.relation.name if arel.ast.relation.is_a? Arel::Table
  table_name = arel.ast.relation.left.name if arel.ast.relation.is_a? Arel::Nodes::JoinSource
  unless table_name
    raise "Could not find table for delete mutation"
  end

  Google::Cloud::Spanner::V1::Mutation.new(
    delete: Google::Cloud::Spanner::V1::Mutation::Delete.new(
      table: table_name,
      key_set: { all: true }
    )
  )
end
delay_from_aborted(err) click to toggle source

Retrieves the delay value from Google::Cloud::AbortedError or GRPC::Aborted

# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 295
def delay_from_aborted err
  return nil if err.nil?
  if err.respond_to?(:metadata) && err.metadata["google.rpc.retryinfo-bin"]
    retry_info = Google::Rpc::RetryInfo.decode err.metadata["google.rpc.retryinfo-bin"]
    seconds = retry_info["retry_delay"].seconds
    nanos = retry_info["retry_delay"].nanos
    return seconds if nanos.zero?
    return seconds + (nanos / 1_000_000_000.0)
  end
  # No metadata? Try the inner error
  delay_from_aborted err.cause
rescue StandardError
  # Any error indicates the backoff should be handled elsewhere
  nil
end
should_use_mutation(arel) click to toggle source

An insert/update/delete statement could use mutations in some specific circumstances. This method returns an indication whether a specific operation should use mutations instead of DML based on the operation itself, and the current transaction.

# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 234
def should_use_mutation arel
  !@connection.current_transaction.nil? \
    && @connection.current_transaction.isolation == :buffered_mutations \
    && can_use_mutation(arel) \
end
sql_statement_type(sql) click to toggle source
# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 281
def sql_statement_type sql
  case sql
  when DDL_REGX
    :ddl
  when DML_REGX
    :dml
  else
    :dql
  end
end
to_types_and_params(binds) click to toggle source

Translates binds to Spanner types and params.

# File lib/active_record/connection_adapters/spanner/database_statements.rb, line 208
def to_types_and_params binds
  types = binds.enum_for(:each_with_index).map do |bind, i|
    type = :INT64
    if bind.respond_to? :type
      type = ActiveRecord::Type::Spanner::SpannerActiveRecordConverter
             .convert_active_model_type_to_spanner(bind.type)
    end
    [
      # Generates binds for named parameters in the format `@p1, @p2, ...`
      "p#{i + 1}", type
    ]
  end.to_h
  params = binds.enum_for(:each_with_index).map do |bind, i|
    type = bind.respond_to?(:type) ? bind.type : ActiveModel::Type::Integer
    value = bind
    value = type.serialize bind.value, :dml if type.respond_to?(:serialize) && type.method(:serialize).arity < 0
    value = type.serialize bind.value if type.respond_to?(:serialize) && type.method(:serialize).arity >= 0

    ["p#{i + 1}", value]
  end.to_h
  [types, params]
end