All Products
Search
Document Center

PolarDB:DDL logical replication

Last Updated:May 15, 2023

Logical replication can be used in PolarDB for PostgreSQL to replicate data definition language (DDL) statements.

Background information

Native PostgreSQL supports only table data synchronization. You must manually create tables with the same definitions on the publisher and subscriber nodes to ensure correct table data synchronization.

PolarDB for PostgreSQL provides the extended logical replication feature to replicate DDL statements. You can use the pub/sub method to replicate the CREATE/ALTER/DROP statements on database objects to the subscriber node.

Prerequisites

To use the DDL logical replication feature, you must set the wal_level parameter to logical. For more information about how to modify the parameter, see Configure cluster parameters.

Syntax

  1. CREATE PUBLICATION

    CREATE PUBLICATION name
        [ FOR TABLE [ ONLY ] table_name [ * ] [, ...]
          | FOR ALL TABLES ]
        [ WITH ( publication_parameter [= value] [, ... ] ) ]
    publication_parameter:
    	...
    	pubddl = '(none | table | all)'

    The pubddl parameter is added to publication_parameter. Default value: none. Valid values: none, table, and all.

    • none: does not replicate DDL statements.

    • table: replicates only table-related DDL statements.

      • CREATE TABLE

      • ALTER TABLE

      • DROP TABLE

      • CREATE TABLE AS

    • all: replicates all DDL statements. The following DDL statements are supported:

      • ALTER INDEX

      • ALTER SEQUENCE

      • ALTER TABLE

      • ALTER TYPE

      • CREATE INDEX

      • CREATE SCHEMA

      • CREATE SEQUENCE

      • CREATE TABLE

      • CREATE TABLE AS

      • CREATE TYPE

      • CREATE TYPE HEADER

      • CREATE TYPE BODY

      • DROP INDEX

      • DROP SCHEMA

      • DROP SEQUENCE

      • DROP TABLE

      • DROP TYPE

        Note

        If you specify pubddl = 'all', you must add FOR ALL TABLES. Global statements can be executed on all databases but cannot be replicated. Global statements include ROLE, DATABASE, TableSpace, and GrantStmt or RevokeStmt (for global objects).

  2. CREATE SUBSCRIPTION

    CREATE SUBSCRIPTION subscription_name
        CONNECTION 'conninfo'
        PUBLICATION publication_name [, ...]
        [ WITH ( subscription_parameter [= value] [, ... ] ) ]
    subscription_parameter: 
    	...
      dump_schema = false/true

    The dump_schema parameter is added to subscription_parameter to dump existing object definitions from the publisher node to the subscriber node when you create a subscription. Default value: false. Valid values:

    • false: does not dump existing object definitions from the publisher node to the subscriber node when you create a subscription.

    • true: dumps existing object definitions from the publisher node to the subscriber node when you create a subscription.

Note

The dump_schema parameter uses the pg_dump or pg_restore tool. You must ensure that the cluster supports the host='127.0.0.1' connection. Otherwise, the cluster fails to be restored. Dumped files are stored in the pg_logical/schemadumps directory of the cluster and are deleted after the restoration or an error.

Parameters

Parameter

Description

polar_enable_ddl_replication

Specifies whether to enable the DDL logical replication feature. Default values: true. Valid values:

  • true

  • false

polar_enable_debug_ddl_replication

Specifies whether to enable debug DDL replicaiton to print more logs. Default values: false. Valid values:

true

false

Examples

  1. Create a publication that supports DDL statements.

    CREATE PUBLICATION mypub FOR ALL TABLES with (pubddl = 'all');

    Sample result:

    CREATE PUBLICATION
  2. Create a subscription.

    CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;

    Sample result:

    NOTICE:  created replication slot "mysub" on publisher
    CREATE SUBSCRIPTION
  3. Execute SQL statements on the publisher node.

    # Create a table.
    CREATE TABLE t1(id int ,val char(3));
    
    # Insert data.
    INSERT INTO t1 values (1,'a');
    INSERT INTO t1 values (2,'b');
    INSERT INTO t1 values (3,'c');
    
    # Modify the table.
    ALTER TABLE t1 ADD COLUMN c int GENERATED BY DEFAULT AS IDENTITY,
      ALTER COLUMN c SET GENERATED ALWAYS;
      
    # View the table.
    SELECT * FROM t1;
     id | val | c 
    ----+-----+---
      1 | a   | 1
      2 | b   | 2
      3 | c   | 3
    (3 rows)
    
    # View comments of the table.
    \d+ t1
                                                             Table "public.t1"
     Column |     Type     | Collation | Nullable |           Default            | Storage  | Compression | Stats target | Description 
    --------+--------------+-----------+----------+------------------------------+----------+-------------+--------------+-------------
     id     | integer      |           |          |                              | plain    |             |              | 
     val    | character(3) |           |          |                              | extended |             |              | 
     c      | integer      |           | not null | generated always as identity | plain    |             |              | 
    Publications:
        "mypub"
    Replica Identity: FULL
    Access method: heap
  4. View the replication information on the subscriber node.

    # View the table.
    SELECT * FROM t1;
     id | val | c 
    ----+-----+---
      1 | a   | 1
      2 | b   | 2
      3 | c   | 3
    (3 rows)
    
    # View comments of the table.
    \d+ t1
                                                             Table "public.t1"
     Column |     Type     | Collation | Nullable |           Default            | Storage  | Compression | Stats target | Description 
    --------+--------------+-----------+----------+------------------------------+----------+-------------+--------------+-------------
     id     | integer      |           |          |                              | plain    |             |              | 
     val    | character(3) |           |          |                              | extended |             |              | 
     c      | integer      |           | not null | generated always as identity | plain    |             |              | 
    Replica Identity: FULL
    Access method: heap
  5. Delete the table on the publisher node.

    DROP TABLE t1;
  6. View the replication information on the subscriber node.

    SELECT * FROM t1;

    Sample result:

    ERROR:  relation "t1" does not exist
    LINE 1: SELECT * FROM t1;

Decoding extension

The following callback interfaces are added in the decoding extension.

/*
 * Output plugin callbacks
 */
typedef struct OutputPluginCallbacks
{
  ...
	LogicalDecodeDDLMessageCB ddl_cb;


	/* streaming of changes */
	...
	LogicalDecodeStreamDDLMessageCB stream_ddl_cb;

} OutputPluginCallbacks;

/*
 * Called for the logical decoding DDL messages.
 */
typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx,
										   ReorderBufferTXN *txn,
										   XLogRecPtr message_lsn,
										   const char *prefix,
										   Oid relid,
										   DeparsedCommandType cmdtype,
										   Size message_size,
										   const char *message);

/*
 * Callback for streaming logical decoding DDL messages from in-progress
 * transactions.
 */
typedef void (*LogicalDecodeStreamDDLMessageCB) (struct LogicalDecodingContext *ctx,
												 ReorderBufferTXN *txn,
												 XLogRecPtr message_lsn,
												 const char *prefix,
												 Oid relid,
												 DeparsedCommandType cmdtype,
												 Size message_size,
												 const char *message);

The DDL message method is implemented in the test_decoding extension. You can use the test decoding extension in the following ways:

CREATE PUBLICATION mypub FOR ALL TABLES with (pubddl = 'all');
SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
create table t3(id int);
SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
    lsn     | xid |                                                                                                                                                                                                                                  
                                                                                                                                                                                                                                                     
                                                                                                                                                                                                                                                     
                                                                        data                                                                                                                                                                         
                                                                                                                                                                                                                                                     
                                                                                                                                                                                                                                                     
                                                                                                                                  
------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------------------------------------
 0/C001BF10 | 783 | BEGIN 783
 0/C001EBC0 | 783 | message: prefix: deparse, relid: 16418, cmdtype: Simple, sz: 1505 content:{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s
 %{access_method}s %{with_clause}s", "identity": {"objname": "t3", "schemaname": "public"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": fal
se, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "
present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{co
llation}s %{not_null}s %{default}s %{identity_column}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present": 
false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "identity_column": {"fmt": "%{identity_
type}s", "identity_type": {"fmt": "", "present": false}}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}
 0/C001EE98 | 783 | COMMIT 783

select polar_catalog.ddl_deparse_expand_command('{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s %{access_method}s %{with_clause}s", "identity": {"objname": "t3", "schemaname": "public"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": false, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{collation}s %{not_null}s %{default}s %{identity_column}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present": false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "identity_column": {"fmt": "%{identity_type}s", "identity_type": {"fmt": "", "present": false}}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}');
                       ddl_deparse_expand_command                        
-------------------------------------------------------------------------
 CREATE  TABLE  public.t3 (id pg_catalog.int4 STORAGE plain      )      
(1 row)

Add system tables and system functions

  • polar_catalog.ddl_deparse_to_json

    • Syntax: ddl_deparse_to_json(IN pg_ddl_command) RETURN text

    • Usage: Parses an internal parsetree into a JSON string.

    • Parameters: The pg_ddl_command that you enter is of the PARSETREE type, and a JSON string of the TEXT type is returned.

  • polar_catalog.ddl_deparse_expand_command

    • Syntax: ddl_deparse_expand_command(IN text) RETURN text

    • Usage: Parses a JSON string into a SQL string.

    • Parameters: The JSON string that you enter is of the TEXT type, and a SQL string of the TEXT type is returned.

  • polar_catalog.polar_publication

    Syntax:

    TABLE polar_publication
    (
        puboid Oid primary key,  -- publication oid
        pubddl "char", -- Specifies whether the publication supports DDL objects.
        pubglobal "char", -- Specifies whether the publication supports global objects (supported soon).
        pubflags int -- The reserved flag bits.
    );