PolarDB for PostgreSQL provides the wal2json plug-in to generate logical log files in the JSON format.
Prerequisites
This extension is supported on the PolarDB for PostgreSQL clusters that run the following engines:
PostgreSQL 14 (revision version 14.5.1.0 or later)
PostgreSQL 11 (revision version 1.1.29 or later)
You can execute the following statement to view the minor version that is used by PolarDB for PostgreSQL:
PostgreSQL 14
SELECT version();
PostgreSQL 11
SHOW polar_version;
Background information
wal2json is a logic decoding plug-in that provides the following features:
Access tuples generated by executing the
INSERT
andUPDATE
statements.Access
UPDATE
andDELETE
old row versions depending on the specified duplicate identity.Consume changes by using the streaming protocol (logical replication slots) or special SQL APIs.
The wal2json plug-in produces a JSON object for each transaction. All new and old tuples are available in the JSON object. In addition, options include properties such as transaction timestamp, schema-qualified, data type, and transaction ID. For more information, see Execute SQL statements to obtain JSON objects.
Precautions
Because PolarDB for PostgreSQL uses the replication method of
REPLICA_IDENTITY_FULL
, the entire row of data is displayed during UPDATE and DELETE, instead of the columns before and after UPDATE and DELETE. To modify only the columns before and after UPDATE, you must set thepolar_create_table_with_full_replica_identity
parameter to off. This parameter cannot be modified in the console. You must Contact us to modify the parameter.The wal2json plug-in requires logical encoding and decoding. You must modify the
wal_level
parameter tological
.NoteThis parameter can be modified in the console. For more information, see Procedure. The cluster restarts after you modify the parameter. Proceed with caution.
Execute SQL statements to obtain JSON objects
You do not need to execute the create extension
statement to create the wal2json plug-in. You can create a logical copy slot to load the wal2json plug-in.
After you create a logical replication slot that contains the wal2json plug-in, execute the following statement to obtain the JSON object in WAL:
-- Create tables with and without primary keys.
CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
-- Create a logical replication slot of the wal2json type.
SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
-- Commit a transaction and write it to WAL.
BEGIN;
INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table2_with_pk (b, c) VALUES('Replication', now());
DELETE FROM table2_with_pk WHERE a < 3;
INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir');
UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;
-- Obtain the JSON object in WAL.
SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1');
Sample result:
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [1, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [2, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_without_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "numeric(5,2)", "text"],
"columnvalues": [1, 2.34, "Tapir"]
}
]
}
Delete the test_slot
replication slot and return the string 'stop'
.
SELECT 'stop' FROM pg_drop_replication_slot('test_slot');
Parameters
The following table describes the parameters related to wal2json.
Parameter | Description |
change | The WAL entry for each DML, such as the WAL records for INSERT, UPDATE, DELETE, and TRUNCATE. |
changeset | A set of several changes. |
include-xids | Specifies whether to add xid to each changeset. Default value: false. Valid values:
|
include-timestamp | Specifies whether to add a timestamp to each changeset. Default value: false. Valid values:
|
include-schemas | Specifies whether to add a schema to each change. Default value: true. Valid values:
|
include-types | Specifies whether to add a type to each change. Default value: true. Valid values:
|
include-typmod | Specifies whether to add modifier to types that have it (such as varchar(20) instead of varchar). Default value: true. Valid values:
|
include-type-oids | Specifies whether to add the type oids. Default value: false. Valid values:
|
include-not-null | Specifies whether to add
|
pretty-print | Specifies whether to add spaces and indents to the JSON structure for formatting. Default value: false. Valid values:
|
write-in-chunks | Specifies whether to write after each change instead of each changeset. Default value: false. Valid values:
|
include-lsn | Specifies whether to add nextlsn to each changeset. Default value: false. Valid values:
|
filter-tables | Excludes rows from the specified tables. By default, this parameter is empty, indicating that no table is excluded. Note
|
add-tables | Includes only rows from the specified tables. By default, all tables from all schemas are parsed. For more information, see filter-tables. |
filter-msg-prefixes | Excludes rows with the specified prefix. This parameter is typically used in the |
add-msg-prefixes | Includes rows with the specified prefix. This parameter is typically used in the |
format-version | Defines which output format to use. Default value: 1. Valid values:
|
actions | Specifies which operations are sent. By default, all operations including INSERT, UPDATE, DELETE, and TRUNCATE are sent. If |
Examples
include-xids
is used in the following examples.
Create a table and a logical replication slots. Insert a row of data.
DROP TABLE IF EXISTS tbl; CREATE TABLE tbl (id int); SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); INSERT INTO tbl VALUES (1);
Enter the parameters and values in the function.
SELECT count(*) = 1, count(distinct ((data::json)->'xid')::text) = 1 FROM pg_logical_slot_get_changes( 'regression_slot', NULL, NULL, 'format-version', '1', 'include-xids', '1');
How it works
For more information, see official documentation.