The PostgreSQL change data capture (CDC) connector is used to read existing data and changed data from a PostgreSQL database. The process is divided into two phases based on the data type: full scan phase and incremental capture phase. If a failure occurs, the connector recovers data in an exactly-once manner to ensure that no data is duplicated or lost. This topic describes how to use the PostgreSQL CDC connector.
Background information
The following table describes the capabilities of the PostgreSQL CDC connector.
Item | Description |
Table type | Source table Note You can use the Java Database Connectivity (JDBC) connector to create a sink or dimension table. |
Running mode | Streaming mode |
Data format | N/A |
Monitoring metrics |
Note
|
API type | SQL API |
Data updates or deletion in a sink table | N/A |
Features
The PostgreSQL CDC supports the incremental snapshot reading feature in Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.6 or later. When the connector is run, it first initiates the full scan phase to read the snapshot of a PostgreSQL database. After the full scan is completed, the connector automatically switches to the incremental capture phase to read write-ahead log (WAL) files of the database. If a failure occurs, the connector recovers data in an exactly-once manner to prevent data duplication or loss.
You can use the connector to create a source table, which provides the following advantages:
Supports reading full and incremental data to facilitate unified stream and batch processing.
Supports parallel reading in the full scan phase, which improves reading performance.
Supports automatic and seamless switching from the full scan phase to the incremental capture phase to reduce resource consumption.
Supports data resumption in the full scan phase to enhance stability.
Supports lock-free reading in the full scan phase to ensure normal business operation.
Prerequisites
The PostgreSQL CDC connector uses the logical replication of PostgreSQL to read data changes from ApsaraDB RDS for PostgreSQL, Amazon RDS for PostgreSQL, and self-managed PostgreSQL databases.
Related configurations may be different in ApsaraDB RDS for PostgreSQL, Amazon RDS for PostgreSQL, and self-managed PostgreSQL databases. Make sure that you complete the required configurations. For more information, see Configure a PostgreSQL database.
After you complete the related configurations, make sure that the following conditions are met:
The wal_level parameter is set to logical, which specifies that the information required for logical decoding is included in WAL files.
The REPLICA IDENTITY parameter of the subscribed table is set to FULL to ensure data consistency.
NoteREPLICA IDENTITY is a PostgreSQL-specific table-level parameter. The parameter specifies whether to include the previous values of the involved table columns in the INSERT and UPDATE events that are generated by the logical decoding plug-in. For more information, see REPLICA IDENTITY.
The max_wal_senders and max_replication_slots parameters are set to a value greater than the number of occupied replication slots in the monitored database and the number of required replication slots for the Flink deployment.
Your database account is assigned a superuser role or granted the LOGIN and REPLICATION permissions. Your database account is also granted the SELECT permission on the subscribed table for full data query.
Limits
The incremental snapshot reading feature of the PostgreSQL CDC connector requires Realtime Compute for Apache Flink that uses VVR 8.0.6 or later.
You must manually manage replication slots to avoid disk space waste.
Replication slots are used to retain WAL segments. If a Flink job is restarted, the job can resume from the most recent checkpoint based on the WAL segments, thereby preventing data loss. If you confirm that a Flink job no longer requires a restart, we recommend that you manually delete the corresponding replication slots to free up the occupied resources. In addition, if the progress marker of a replication slot remains stagnant for a long period of time, PostgreSQL retains the WAL records after the marked position. The retention of unnecessary WAL records may result in a significant increase in disk space usage.
If you enable the incremental snapshot reading feature of the PostgreSQL CDC connector, you must enable checkpointing for the connector and define a primary key in the source table. When the incremental snapshot reading feature is enabled, multiple temporary replication slots are created to achieve parallel reading in the full scan phase.
The number of required replication slots in the full scan phase is equal to
Number of sources × Parallelism + 1
. After the PostgreSQL CDC connector switches from the full scan phase to the incremental capture phase, the system automatically removes temporary replication slots and retains only one replication slot. If you want to limit the number of replication slots used in the full scan phase, you can reduce the parallelism. Take note that this also reduces the reading speed. If the downstream operator or storage supports idempotence, you can use the following configuration to skip log reading in the full scan phase:scan.incremental.snapshot.backfill.skip = true
. In this case, the connector provides only at-least-once guarantees. If you use SQL to perform operations such as aggregation and association, we recommend that you do not skip log reading in the full scan phase.
If you disable the incremental snapshot reading feature, the PostgreSQL CDC connector cannot perform checkpoints during the full scan phase.
If the feature is disabled and checkpoints are triggered during the full scan phase, the deployment may perform a failover due to a checkpoint timeout. To prevent the issue, we recommend that you add the following configurations to the Other Configuration field in the Parameters section of the Configuration tab. For more information, see Management and operations for workspaces and namespaces.
execution.checkpointing.interval: 10min execution.checkpointing.tolerable-failed-checkpoints: 100 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647
The following table describes the parameters in the preceding configurations.
Parameter
Description
Notes
execution.checkpointing.interval
The interval at which checkpoints are triggered.
Data type: DURATION. Example: 10min and 30s.
execution.checkpointing.tolerable-failed-checkpoints
The maximum number of checkpoint failures that are allowed.
The product of the value of this parameter and the checkpoint scheduling interval is the maximum duration of the full scan phase.
NoteIf the scanned table is excessively large, we recommend that you specify a sufficiently large value.
restart-strategy
The restart policy.
Valid values:
fixed-delay: The job is restarted with a fixed delay.
failure-rate: The job is restarted based on the failure rate.
exponential-delay: The job is restarted with an exponential delay.
For more information, see Restart Strategies.
restart-strategy.fixed-delay.attempts
The maximum number of restart attempts when you set the restart-strategy parameter to fixed-delay.
N/A.
Syntax
CREATE TABLE postgrescdc_source (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = '<yourDatabaseName>',
'schema-name' = '<yourSchemaName>',
'table-name' = '<yourTableName>'
);
Parameters in the WITH clause
Parameter | Description | Data type | Required | Default value | Notes |
connector | The type of the connector. | STRING | Yes | No default value | Set the value to |
hostname | The IP address or hostname of the PostgreSQL database. | STRING | Yes | No default value | N/A. |
username | The username that is used to access the PostgreSQL database service. | STRING | Yes | No default value | N/A. |
password | The password that is used to access the PostgreSQL database service. | STRING | Yes | No default value | N/A. |
database-name | The name of the PostgreSQL database. | STRING | Yes | No default value | The name of the PostgreSQL database. |
schema-name | The schema name of the PostgreSQL database. | STRING | Yes | No default value | If you want to read data from multiple schemas, set this parameter to a regular expression. |
table-name | The name of the table in the PostgreSQL database. | STRING | Yes | No default value | If you want to read data from multiple tables, you can set this parameter to a regular expression. |
port | The port that is used to access the PostgreSQL database service. | INTEGER | No | 5432 | N/A. |
decoding.plugin.name | The name of the logical decoding plug-in that is installed in the PostgreSQL database service. | STRING | No | decoderbufs | Valid values:
|
slot.name | The name of the logical decoding slot. | STRING | Yes (applies to VVR 8.0.1 or later) | flink (applies to VVR versions earlier than 8.0.1) or N/A (applies to VVR 8.0.1 or later) | We recommend that you configure the |
debezium.* | The Debezium properties. | STRING | No | No default value | This parameter is used to achieve fine-grained control over the Debezium client. Example: |
scan.incremental.snapshot.enabled | Specifies whether to enable the incremental snapshot reading feature. | BOOLEAN | No | false | Valid values:
Note
|
scan.startup.mode | The startup mode that is used to consume data. | STRING | No | initial | Valid values:
|
changelog-mode | The changelog mode that is used to encode streaming changes. | STRING | No | all | Valid values:
|
heartbeat.interval.ms | The interval at which heartbeat packets are sent. | DURATION | No | 30s | Unit: millisecond. The PostgreSQL CDC connector sends heartbeat packets to the monitored database to synchronize the progress offset of replication slots. If table changes are not frequent, configure this parameter to remove unnecessary WAL files at the earliest opportunity. |
scan.incremental.snapshot.chunk.key-column | The column that is used to split tables into chunks in the full scan phase. | STRING | No | No default value | By default, the first column of the primary key is used. |
scan.incremental.close-idle-reader.enabled | Specifies whether to close the idle readers at the end of the full scan phase. | BOOLEAN | No | false | This parameter is valid only if you set the |
scan.incremental.snapshot.backfill.skip | Specifies whether to skip log reading in the full scan phase. | BOOLEAN | No | false | Valid values:
|
Data type mappings
The following table describes the mapping between data types in PostgreSQL and Realtime Compute for Apache Flink.
Data type in PostgreSQL | Data type in Realtime Compute for Apache Flink |
SMALLINT | SMALLINT |
INT2 | |
SMALLSERIAL | |
SERIAL2 | |
INTEGER | INT |
SERIAL | |
BIGINT | BIGINT |
BIGSERIAL | |
REAL | FLOAT |
FLOAT4 | |
FLOAT8 | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | DECIMAL(p, s) |
DECIMAL(p, s) | |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
CHARACTER(n) | |
VARCHAR(n) | |
CHARACTER VARYING(n) | |
TEXT | |
BYTEA | BYTES |
Sample code
CREATE TABLE source (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<host name>',
'port' = '<port>',
'username' = '<user name>',
'password' = '<password>',
'database-name' = '<database name>',
'schema-name' = '<schema name>',
'table-name' = '<table name>'
);
SELECT * FROM source;
References
For information about the connectors that are supported by Realtime Compute for Apache Flink, see Supported connectors.
For more information about how to write data to a PolarDB for Oracle 1.0 result table, see PolarDB for Oracle 1.0 connector.
You can use the MySQL connector to read data from or write data to ApsaraDB RDS for MySQL, PolarDB for MySQL, and self-managed MySQL databases. For more information, see MySQL connector.