module ActiveRecord::ConnectionAdapters::Materialize::Schema::SourceStatements

Public Instance Methods

create_source(source_name, publication:, source_type: :postgres, materialized: true, connection_params: nil) click to toggle source
# File lib/active_record/connection_adapters/materialize/schema/source_statements.rb, line 42
          def create_source(source_name, publication:, source_type: :postgres, materialized: true, connection_params: nil)
            materialized_statement = materialized ? 'MATERIALIZED ' : ''
            connection_string = select_database_config(connection_params).map { |k, v| [k, v].join("=") }.join(" ")
            case source_type
            when :postgres
              execute <<-SQL.squish
                CREATE #{materialized_statement}SOURCE #{quote_schema_name(source_name)} FROM POSTGRES
                  CONNECTION #{quote(connection_string)}
                  PUBLICATION #{quote(publication)}
              SQL
            else
              raise "Source type #{source_type} is currently unsupported for Materialized sources", NotImplementedError
            end
          end
drop_source(source_name) click to toggle source
# File lib/active_record/connection_adapters/materialize/schema/source_statements.rb, line 57
def drop_source(source_name)
end
select_database_config(connection_params) click to toggle source

“host=postgresdb port=5432 user=postgres dbname=source_database”

# File lib/active_record/connection_adapters/materialize/schema/source_statements.rb, line 32
def select_database_config(connection_params)
  {
    host: connection_params['host'],
    port: connection_params['port'],
    user: connection_params['username'],
    dbname: connection_params['database'],
    password: connection_params['password']
  }.compact
end
source_exists?(source_name) click to toggle source
# File lib/active_record/connection_adapters/materialize/schema/source_statements.rb, line 60
def source_exists?(source_name)
end
source_options(source_name) click to toggle source

Get source options from an existing source

# File lib/active_record/connection_adapters/materialize/schema/source_statements.rb, line 14
def source_options(source_name)
  name_ref, statement = query("SHOW CREATE SOURCE #{quote_table_name(source_name)}", "SCHEMA").first
  database_name, schema_name, publication_name = name_ref.split(".")
  materialized = !!/CREATE\sMATERIALIZED/.match(statement)
   _, source_type, _ = /FROM\s(POSTGRES|KAFKA\sBROKER)\sCONNECTION/.match(statement).to_s.split " "
   source_type = source_type.to_s.downcase.to_sym

  {
    database_name: database_name,
    schema_name: schema_name,
    source_name: source_name,
    source_type: source_type,
    publication: publication_name,
    materialized: materialized || source_type == :postgres
  }
end
sources() click to toggle source
# File lib/active_record/connection_adapters/materialize/schema/source_statements.rb, line 9
def sources
  query_values(data_source_sql(type: "SOURCE"), "SCHEMA")
end