class PG::LogicalReplication::Client
Attributes
Public Class Methods
@param connection [PG::Connection] Database Connection
# File lib/pg/logical_replication/client.rb, line 18 def initialize(connection) @connection = connection @command_builder = PG::LogicalReplication::CommandBuilder.new(connection) end
# File lib/pg/logical_replication/client.rb, line 9 def self.type_map_for_queries(connection) @type_map_for_queries ||= PG::BasicTypeMapForQueries.new(connection) end
# File lib/pg/logical_replication/client.rb, line 13 def self.type_map_for_results(connection) @type_map_for_results ||= PG::BasicTypeMapForResults.new(connection) end
Public Instance Methods
Adds tables to a publication
@param name [String] publication name @param tables [Array<String>] table names to add
# File lib/pg/logical_replication/client.rb, line 284 def add_tables_to_publication(name, tables) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} ADD TABLE #{safe_list(tables)}") end
Alters parameters originally set by CREATE PUBLICATION
@param name [String] publication name @param options [Hash] parameters to set
# File lib/pg/logical_replication/client.rb, line 308 def alter_publication_options(name, options) base_command = "ALTER PUBLICATION #{connection.quote_ident(name)}" typed_exec(command_builder.command_with_options(base_command, "SET", options)) end
Alters parameters originally set by CREATE SUBSCRIPTION
@param name [String] subscription name @param options [Hash] parameters to set
# File lib/pg/logical_replication/client.rb, line 146 def alter_subscription_options(name, options) base_command = "ALTER SUBSCRIPTION #{connection.quote_ident(name)}" typed_exec(command_builder.command_with_options(base_command, "SET", options)) end
Creates a logical replication slot
@param name [String] logical replication slot name
# File lib/pg/logical_replication/client.rb, line 84 def create_logical_replication_slot(name) typed_exec("SELECT pg_create_logical_replication_slot(#{connection.escape_literal(name)}, 'pgoutput')") end
Creates a new publication
@param name [String] publication name @param all_tables [Boolean] replicate changes for all tables, including ones created in the future @param tables [Array<String>] tables to be added to the publication, ignored if all_tables is true @param options [Hash] optional parameters
# File lib/pg/logical_replication/client.rb, line 270 def create_publication(name, all_tables = false, tables = [], options = {}) base_command = "CREATE PUBLICATION #{connection.quote_ident(name)}" if all_tables base_command << " FOR ALL TABLES" elsif !tables.empty? base_command << " FOR TABLE #{safe_list(tables)}" end typed_exec(@command_builder.command_with_options(base_command, "WITH", options)) end
Creates a subscription to a publisher node
@param name [String] subscription name @param conninfo_hash [Hash] publisher node connection info @param publications [Array<String>] publication names to subscribe to @param options [Hash] optional parameters for CREATE SUBSCRIPTION
# File lib/pg/logical_replication/client.rb, line 61 def create_subscription(name, conninfo_hash, publications, options = {}) options[:slot_name] = name if !options.key?(:slot_name) && !options.key?("slot_name") && (options['create_slot'] == false || options[:create_slot] == false) connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash)) base_command = <<-SQL CREATE SUBSCRIPTION #{connection.quote_ident(name)} CONNECTION '#{connection_string}' PUBLICATION #{safe_list(publications)} SQL typed_exec(command_builder.command_with_options(base_command, "WITH", options)) end
Disables the running subscription
@param name [String] subscription name
# File lib/pg/logical_replication/client.rb, line 138 def disable_subscription(name) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} DISABLE") end
Remove a publication
@param name [String] publication name @param ifexists [Boolean] if true an error is not thrown when the publication does not exist
# File lib/pg/logical_replication/client.rb, line 333 def drop_publication(name, ifexists = false) typed_exec("DROP PUBLICATION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}") end
Drops the physical or logical replication slot. Note, you must be on the same database a logical slot was created.
@param name [String] replication slot name
# File lib/pg/logical_replication/client.rb, line 91 def drop_replication_slot(name) typed_exec("SELECT pg_drop_replication_slot(#{connection.escape_literal(name)})") end
Disconnects the subscription and removes it
@param name [String] subscription name @param ifexists [Boolean] if true an error is not thrown when the subscription does not exist
# File lib/pg/logical_replication/client.rb, line 77 def drop_subscription(name, ifexists = false) typed_exec("DROP SUBSCRIPTION#{" IF EXISTS" if ifexists} #{connection.quote_ident(name)}") end
Enables the previously disabled subscription
@param name [String] subscription name
# File lib/pg/logical_replication/client.rb, line 131 def enable_subscription(name) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} ENABLE") end
Reports on replication lag from publisher to subscriber nodes This method must be run on the publisher node
@return [Array<Hash<String,String>>] List of returned lag and application names,
one for each replication process
# File lib/pg/logical_replication/client.rb, line 28 def lag_bytes typed_exec(<<-SQL).to_a SELECT pg_wal_lsn_diff(pg_current_wal_insert_lsn(), flush_lsn) AS lag_bytes, application_name FROM pg_stat_replication SQL end
Lists the current publications
@return [Array<String>] publication names
# File lib/pg/logical_replication/client.rb, line 245 def publications typed_exec(<<-SQL) SELECT pubname::TEXT AS name, usename::TEXT AS owner, puballtables, pubinsert, pubupdate, pubdelete FROM pg_publication JOIN pg_user ON pubowner = usesysid SQL end
# File lib/pg/logical_replication/client.rb, line 260 def publishes?(publication_name) publications.any? { |p| p["name"] == publication_name } end
Removes tables from a publication
@param name [String] publication name @param tables [Array<String>] table names to remove
# File lib/pg/logical_replication/client.rb, line 300 def remove_tables_from_publication(name, tables) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} DROP TABLE #{safe_list(tables)}") end
Renames a publication
@param name [String] current publication name @param new_name [String] new publication name
# File lib/pg/logical_replication/client.rb, line 325 def rename_publication(name, new_name) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}") end
Renames the subscription
@param name [String] current subscription name @param new_name [String] new subscription name
# File lib/pg/logical_replication/client.rb, line 163 def rename_subscription(name, new_name) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} RENAME TO #{connection.quote_ident(new_name)}") end
Lists the current replication slots
@return [Array<String>] replication slots
# File lib/pg/logical_replication/client.rb, line 229 def replication_slots typed_exec(<<-SQL) SELECT slot_name::TEXT, plugin::TEXT, slot_type::TEXT, database::TEXT, temporary, active FROM pg_replication_slots SQL end
Sets the owner of a publication
@param name [String] publication name @param owner [String] new owner user name
# File lib/pg/logical_replication/client.rb, line 317 def set_publication_owner(name, owner) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}") end
Sets the tables included in a publication
@param name [String] publication name @param tables [Array<String>] table names
# File lib/pg/logical_replication/client.rb, line 292 def set_publication_tables(name, tables) typed_exec("ALTER PUBLICATION #{connection.quote_ident(name)} SET TABLE #{safe_list(tables)}") end
Updates a subscription connection string
@param name [String] subscription name @param conninfo_hash [Hash] new external connection hash to the publisher node
# File lib/pg/logical_replication/client.rb, line 99 def set_subscription_conninfo(name, conninfo_hash) connection_string = connection.escape_string(PG::Connection.parse_connect_args(conninfo_hash)) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} CONNECTION '#{connection_string}'") end
Sets the owner of the subscription
@param name [String] subscription name @param owner [String] new owner user name
# File lib/pg/logical_replication/client.rb, line 155 def set_subscription_owner(name, owner) typed_exec("ALTER SUBSCRIPTION #{connection.quote_ident(name)} OWNER TO #{connection.quote_ident(owner)}") end
Changes list of subscribed publications
@param name [String] subscription name @param publications [Array<String>] publication names to subscribe to @param options [Hash] optional parameters
# File lib/pg/logical_replication/client.rb, line 109 def set_subscription_publications(name, publications, options = {}) base_command = <<-SQL ALTER SUBSCRIPTION #{connection.quote_ident(name)} SET PUBLICATION #{safe_list(publications)} SQL typed_exec(@command_builder.command_with_options(base_command, "WITH", options)) end
Returns if this database is subscribing to any publications
@return [Boolean] true if there are any subscriptions, false otherwise
# File lib/pg/logical_replication/client.rb, line 222 def subscriber?(dbname = nil) subscriptions(dbname).any? end
Shows status and basic information about all subscriptions
@return [Array<Hash>] a list of subscriptions
keys: subscription_name database_name owner worker_count enabled subscription_dsn slot_name publications remote_replication_lsn local_replication_lsn
# File lib/pg/logical_replication/client.rb, line 181 def subscriptions(dbname = nil) subscriptions = typed_exec(<<-SQL).to_a SELECT sub.subname::TEXT AS subscription_name, pg_database.datname::TEXT AS database_name, pg_user.usename::TEXT AS owner, COUNT(sub_stat.pid) AS worker_count, sub.subenabled AS enabled, sub.subconninfo AS subscription_dsn, sub.subslotname::TEXT AS slot_name, sub.subpublications AS publications, stat.remote_lsn::TEXT AS remote_replication_lsn, stat.local_lsn::TEXT AS local_replication_lsn FROM pg_subscription AS sub JOIN pg_user ON sub.subowner = usesysid JOIN pg_database ON sub.subdbid = pg_database.oid LEFT JOIN pg_replication_origin_status stat ON concat('pg_', sub.oid) = stat.external_id LEFT JOIN pg_stat_subscription sub_stat ON sub_stat.subid = sub.oid AND sub_stat.pid IS NOT NULL GROUP BY sub.subname, pg_database.datname, pg_user.usename, sub.subenabled, sub.subconninfo, sub.subslotname, sub.subpublications, stat.remote_lsn, stat.local_lsn SQL dbname ? subscriptions.select { |s| s["database_name"] == dbname } : subscriptions end
Fetch missing table information from publisher
@param name [String] subscription name @param options [Hash] optional parameters
# File lib/pg/logical_replication/client.rb, line 121 def sync_subscription(name, options = {}) base_command = <<-SQL ALTER SUBSCRIPTION #{connection.quote_ident(name)} REFRESH PUBLICATION SQL typed_exec(@command_builder.command_with_options(base_command, "WITH", options)) end
Lists the tables currently in the publication
@param set_name [String] publication name @return [Array<String>] table names
# File lib/pg/logical_replication/client.rb, line 341 def tables_in_publication(name) typed_exec(<<-SQL, name).values.flatten SELECT tablename::TEXT FROM pg_publication_tables WHERE pubname = $1 SQL end
Reports on replication bytes of WAL being retained for each replication slot This method must be run on the publisher node
@return [Array<Hash<String,String>>] List of returned WAL bytes and replication slot names,
one for each replication process
# File lib/pg/logical_replication/client.rb, line 43 def wal_retained_bytes typed_exec(<<-SQL).to_a SELECT pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn) AS retained_bytes, slot_name::TEXT FROM pg_replication_slots WHERE plugin = 'pgoutput' SQL end
Private Instance Methods
# File lib/pg/logical_replication/client.rb, line 351 def safe_list(list) list.map { |e| connection.quote_ident(e) }.join(", ") end
# File lib/pg/logical_replication/client.rb, line 355 def typed_exec(sql, *params) result = connection.async_exec(sql, params, nil, self.class.type_map_for_queries(connection)) result.map_types!(self.class.type_map_for_results(connection)) end