Logical replication can be used in PolarDB for PostgreSQL to replicate data definition language (DDL) statements.
Background information
Native PostgreSQL supports only table data synchronization. You must manually create tables with the same definitions on the publisher and subscriber nodes to ensure correct table data synchronization.
PolarDB for PostgreSQL provides the extended logical replication feature to replicate DDL statements. You can use the pub/sub method to replicate the CREATE/ALTER/DROP
statements on database objects to the subscriber node.
Prerequisites
To use the DDL logical replication feature, you must set the wal_level
parameter to logical
. For more information about how to modify the parameter, see Configure cluster parameters.
Syntax
CREATE PUBLICATION
CREATE PUBLICATION name [ FOR TABLE [ ONLY ] table_name [ * ] [, ...] | FOR ALL TABLES ] [ WITH ( publication_parameter [= value] [, ... ] ) ] publication_parameter: ... pubddl = '(none | table | all)'
The
pubddl
parameter is added topublication_parameter
. Default value: none. Valid values:none
,table
, andall
.none: does not replicate DDL statements.
table: replicates only
table
-related DDL statements.CREATE TABLE
ALTER TABLE
DROP TABLE
CREATE TABLE AS
all: replicates all DDL statements. The following DDL statements are supported:
ALTER INDEX
ALTER SEQUENCE
ALTER TABLE
ALTER TYPE
CREATE INDEX
CREATE SCHEMA
CREATE SEQUENCE
CREATE TABLE
CREATE TABLE AS
CREATE TYPE
CREATE TYPE HEADER
CREATE TYPE BODY
DROP INDEX
DROP SCHEMA
DROP SEQUENCE
DROP TABLE
DROP TYPE
NoteIf you specify
pubddl = 'all'
, you must addFOR ALL TABLES
. Global statements can be executed on all databases but cannot be replicated. Global statements includeROLE
,DATABASE
,TableSpace
, andGrantStmt
orRevokeStmt
(for global objects).
CREATE SUBSCRIPTION
CREATE SUBSCRIPTION subscription_name CONNECTION 'conninfo' PUBLICATION publication_name [, ...] [ WITH ( subscription_parameter [= value] [, ... ] ) ] subscription_parameter: ... dump_schema = false/true
The
dump_schema
parameter is added tosubscription_parameter
todump
existing object definitions from the publisher node to the subscriber node when you create a subscription. Default value: false. Valid values:false: does not
dump
existing object definitions from the publisher node to the subscriber node when you create a subscription.true:
dumps
existing object definitions from the publisher node to the subscriber node when you create a subscription.
The dump_schema
parameter uses the pg_dump or pg_restore tool. You must ensure that the cluster supports the host='127.0.0.1'
connection. Otherwise, the cluster fails to be restored. Dumped files are stored in the pg_logical/schemadumps
directory of the cluster and are deleted after the restoration or an error.
Parameters
Parameter | Description |
polar_enable_ddl_replication | Specifies whether to enable the DDL logical replication feature. Default values: true. Valid values:
|
polar_enable_debug_ddl_replication | Specifies whether to enable debug DDL replicaiton to print more logs. Default values: false. Valid values: true false |
Examples
Create a publication that supports DDL statements.
CREATE PUBLICATION mypub FOR ALL TABLES with (pubddl = 'all');
Sample result:
CREATE PUBLICATION
Create a subscription.
CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;
Sample result:
NOTICE: created replication slot "mysub" on publisher CREATE SUBSCRIPTION
Execute SQL statements on the publisher node.
# Create a table. CREATE TABLE t1(id int ,val char(3)); # Insert data. INSERT INTO t1 values (1,'a'); INSERT INTO t1 values (2,'b'); INSERT INTO t1 values (3,'c'); # Modify the table. ALTER TABLE t1 ADD COLUMN c int GENERATED BY DEFAULT AS IDENTITY, ALTER COLUMN c SET GENERATED ALWAYS; # View the table. SELECT * FROM t1; id | val | c ----+-----+--- 1 | a | 1 2 | b | 2 3 | c | 3 (3 rows) # View comments of the table. \d+ t1 Table "public.t1" Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description --------+--------------+-----------+----------+------------------------------+----------+-------------+--------------+------------- id | integer | | | | plain | | | val | character(3) | | | | extended | | | c | integer | | not null | generated always as identity | plain | | | Publications: "mypub" Replica Identity: FULL Access method: heap
View the replication information on the subscriber node.
# View the table. SELECT * FROM t1; id | val | c ----+-----+--- 1 | a | 1 2 | b | 2 3 | c | 3 (3 rows) # View comments of the table. \d+ t1 Table "public.t1" Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description --------+--------------+-----------+----------+------------------------------+----------+-------------+--------------+------------- id | integer | | | | plain | | | val | character(3) | | | | extended | | | c | integer | | not null | generated always as identity | plain | | | Replica Identity: FULL Access method: heap
Delete the table on the publisher node.
DROP TABLE t1;
View the replication information on the subscriber node.
SELECT * FROM t1;
Sample result:
ERROR: relation "t1" does not exist LINE 1: SELECT * FROM t1;
Decoding extension
The following callback interfaces are added in the decoding extension.
/*
* Output plugin callbacks
*/
typedef struct OutputPluginCallbacks
{
...
LogicalDecodeDDLMessageCB ddl_cb;
/* streaming of changes */
...
LogicalDecodeStreamDDLMessageCB stream_ddl_cb;
} OutputPluginCallbacks;
/*
* Called for the logical decoding DDL messages.
*/
typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
const char *prefix,
Oid relid,
DeparsedCommandType cmdtype,
Size message_size,
const char *message);
/*
* Callback for streaming logical decoding DDL messages from in-progress
* transactions.
*/
typedef void (*LogicalDecodeStreamDDLMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
const char *prefix,
Oid relid,
DeparsedCommandType cmdtype,
Size message_size,
const char *message);
The DDL message method is implemented in the test_decoding
extension. You can use the test decoding
extension in the following ways:
CREATE PUBLICATION mypub FOR ALL TABLES with (pubddl = 'all');
SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
create table t3(id int);
SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid |
data
------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------------------------------------
0/C001BF10 | 783 | BEGIN 783
0/C001EBC0 | 783 | message: prefix: deparse, relid: 16418, cmdtype: Simple, sz: 1505 content:{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s
%{access_method}s %{with_clause}s", "identity": {"objname": "t3", "schemaname": "public"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": fal
se, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "
present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{co
llation}s %{not_null}s %{default}s %{identity_column}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present":
false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "identity_column": {"fmt": "%{identity_
type}s", "identity_type": {"fmt": "", "present": false}}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}
0/C001EE98 | 783 | COMMIT 783
select polar_catalog.ddl_deparse_expand_command('{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s %{access_method}s %{with_clause}s", "identity": {"objname": "t3", "schemaname": "public"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": false, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{collation}s %{not_null}s %{default}s %{identity_column}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present": false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "identity_column": {"fmt": "%{identity_type}s", "identity_type": {"fmt": "", "present": false}}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}');
ddl_deparse_expand_command
-------------------------------------------------------------------------
CREATE TABLE public.t3 (id pg_catalog.int4 STORAGE plain )
(1 row)
Add system tables and system functions
polar_catalog.ddl_deparse_to_json
Syntax:
ddl_deparse_to_json(IN pg_ddl_command) RETURN text
Usage: Parses an internal parsetree into a JSON string.
Parameters: The pg_ddl_command that you enter is of the PARSETREE type, and a JSON string of the TEXT type is returned.
polar_catalog.ddl_deparse_expand_command
Syntax:
ddl_deparse_expand_command(IN text) RETURN text
Usage: Parses a JSON string into a SQL string.
Parameters: The JSON string that you enter is of the TEXT type, and a SQL string of the TEXT type is returned.
polar_catalog.polar_publication
Syntax:
TABLE polar_publication ( puboid Oid primary key, -- publication oid pubddl "char", -- Specifies whether the publication supports DDL objects. pubglobal "char", -- Specifies whether the publication supports global objects (supported soon). pubflags int -- The reserved flag bits. );