DataWorks provides PostgreSQL Reader and PostgreSQL Writer for you to read data from and write data to PostgreSQL data sources. You can use the codeless user interface (UI) or code editor to configure synchronization tasks for PostgreSQL data sources. This topic describes the capabilities of synchronizing data from or to PostgreSQL data sources.
Supported PostgreSQL versions
The following versions are supported: PostgreSQL 10
, PostgreSQL 11
, PostgreSQL 12
, PostgreSQL 13
, PostgreSQL 14
, and PostgreSQL 15
. You can execute the following statement to view the version of a PostgreSQL database:
SHOW SERVER_VERSION;
Limits
Batch data read and write
Data of views can be read.
PostgreSQL data sources support password authentication and SCRAM-SHA-256 authentication. If the password and authentication method of the PostgreSQL database that you add to DataWorks as a data source are changed, you must update the configurations of the data source and test the network connectivity between the data source and your resource group again. In addition, you must also manually run the related data synchronization task again to ensure that the configurations of the task meet your business requirements.
If the name of a PostgreSQL table or the name of a field in a PostgreSQL table starts with a digit or the name contains a letter or a hyphen (-), use double quotation marks (") to escape the name. If you do not escape the name, an error occurs when you use PostgreSQL Reader or PostgreSQL Writer to read data from or write data to the PostgreSQL database. For PostgreSQL Reader and PostgreSQL Writer, double quotation marks (") are keywords in JSON code. Therefore, you must use backslashes (\) to escape the double quotation marks (") that you use. For example, if the name of a PostgreSQL table is 123Test
, the name becomes \"123Test\"
after it is escaped.
Each double quotation mark (") must be escaped by using a backslash (\).
You can use only the code editor to escape characters.
The following code provides an example on how to use the code editor to escape characters:
"parameter": {
"datasource": "abc",
"column": [
"id",
"\"123Test\"", // Add escape characters.
],
"where": "",
"splitPk": "id",
"table": "public.wpw_test"
},
Real-time data read
The real-time synchronization tasks of Data Integration have the following limits:
Data Integration supports
ADD COLUMN
statements.An
ADD COLUMN
statement cannot be combined with other DDL statements, such asDROP COLUMN
.ImportantIf you execute an
ADD COLUMN
statement together with anALTER COLUMN
statement, such as aDROP COLUMN or RNAME COLUMN
statement, on a data synchronization task, the task cannot work as expected.DDL statements other than
ADD COLUMN
cannot be identified.
ALTER TABLE and CREATE TABLE
statements are not supported.TEMPORARY tables, UNLOGGED tables, and Hyper tables cannot be replicated. The PostgreSQL database does not provide a log parsing and subscription mechanism for the preceding types of tables.
Sequences that are created by using the
SERIAL, BIGSERIAL, or IDENTITY
data type cannot be replicated.TRUNCATE statements are not supported.
Large objects of the BYTEA type cannot be replicated.
Views, materialized views, and external tables cannot be replicated.
Data type mappings
Batch data read and write
Most PostgreSQL data types are supported. Make sure that the data types of your database are supported.
The following table lists the data type mappings based on which PostgreSQL Reader or PostgreSQL Writer converts data types.
Category | PostgreSQL data type |
Integer | BIGINT, BIGSERIAL, INTEGER, SMALLINT, and SERIAL |
Floating point | DOUBLE PRECISION, MONEY, NUMERIC, and REAL |
String | VARCHAR, CHAR, TEXT, BIT, and INET |
Date and time | DATE, TIME, and TIMESTAMP |
Boolean | BOOLEAN |
Binary | BYTEA |
Data types that are not listed in the preceding table are not supported.
For PostgreSQL Reader, you can convert the MONEY, INET, and BIT data types by using syntax such as
a_inet::varchar
.
Prepare a PostgreSQL environment before data synchronization
Before you use DataWorks to synchronize data from or to a PostgreSQL data source, you must prepare a PostgreSQL environment. This ensures that a data synchronization task can be configured and can synchronize data from or to the PostgreSQL data source as expected. The following information describes how to prepare a PostgreSQL environment for data synchronization from or to a PostgreSQL data source.
Preparation 1: Create an account and grant the required permissions to the account
You must create an account that is used to log on to the PostgreSQL database for subsequent operations. The account must have the REPLICATION and LOGIN
permissions on the PostgreSQL database.
Real-time synchronization supports only the logical replication mechanism. Logical replication uses a publish and subscribe model in which one or more subscribers subscribe to one or more publications on the node of the publisher. The subscribers extract data from the publications to which they subscribe.
In most cases, the logical replication of a table starts with creating a snapshot for the data in the database of the publisher and replicating the snapshot to the subscriber. After logical replication is complete, modifications to the publisher are sent to the subscriber in real time.
Create an account.
For more information, see Create a database and an account.
Grant the required permissions to the account.
Execute the following statement to check whether the account has the
REPLICATION
permission on the PostgreSQL database:select userepl from pg_user where usename='xxx'
The expected return value is True. If False is returned, the account does not have the REPLICATION permission on the PostgreSQL database. You can execute the following statement to grant the REPLICATION permission to the account:
ALTER USER <user> REPLICATION;
Preparation 2: Check whether the PostgreSQL database is a secondary database
SELECT pg_is_in_recovery()
Real-time synchronization supports only primary databases. The expected return value is False. If True is returned, the PostgreSQL database is a secondary database. Real-time synchronization does not support secondary databases. In this case, you must change the database that is configured in the PostgreSQL data source to the primary database. For more information, see Add a PostgreSQL data source.
Preparation 3: Check whether the value of the wal_level
parameter is logical
show wal_level
The wal_level
parameter specifies the wal_log
level. The expected return value is logical. If logical is not returned, the logical replication mechanism is not supported.
Preparation 4: Check whether the wal_sender
process can be started
-- Query the value of the max_wal_senders parameter.
show max_wal_senders;
-- Query the number of wal_sender processes in the pg_stat_replication view.
select count(*) from pg_stat_replication
If the max_wal_senders
parameter is not empty and the value of the max_wal_senders
parameter is greater than the number of wal_sender processes in the pg_stat_replication
view, idle wal_sender
processes exist. The PostgreSQL database starts a wal_sender
process for the data synchronization program to send logs to subscribers.
Execute the ALTER TABLE [tableName] REPLICA IDENTITY FULL
statement to enable logical replication for the table that you want to synchronize. If you do not execute the statement, an error may be reported for the real-time synchronization task.
After a real-time synchronization task is started, the system creates a slot and a publication in the PostgreSQL database. The slot is named in the following format: di_slot_ + Task ID
. The publication is named in the following format: di_pub_ + Task ID
. After the task is stopped or undeployed, you must manually delete the slot. Otherwise, the number of write-ahead logging (WAL) files in the PostgreSQL database will constantly increase.
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a data synchronization task, see the following sections. For information about the parameter settings, view the infotip of each parameter on the configuration tab of the task.
Add a data source
Before you configure a data synchronization task to synchronize data from or to a specific data source, you must add the data source to DataWorks. For more information, see Add and manage data sources.
If SSL-based authentication is enabled for your PostgreSQL database, you must also enable SSL-based authentication when you add your PostgreSQL database to DataWorks as a data source. For more information, see Appendix 2: Enable SSL-based authentication for a PostgreSQL data source.
Configure a batch synchronization task to synchronize data of a single table
For more information about the configuration procedure, see Configure a batch synchronization task by using the codeless UI and Configure a batch synchronization task by using the code editor.
For information about all parameters that are configured and the code that is run when you use the code editor to configure a batch synchronization task, see Appendix 1: Code and parameters.
Configure synchronization settings to implement batch synchronization of all data in a database or real-time synchronization of full and incremental data in a single table or a database
For more information about the configuration procedure, see Configure a synchronization task in Data Integration.
Additional information
Data synchronization between primary and secondary databases
A secondary PostgreSQL database can be deployed for disaster recovery. The secondary database continuously synchronizes data from the primary database. Data latency between the primary and secondary databases cannot be prevented. This may result in data inconsistency.
Data consistency control
PostgreSQL is a relational database management system (RDBMS) that supports strong consistency for data queries. A database snapshot is created before a synchronization task starts. PostgreSQL Reader reads data from the database snapshot. Therefore, if new data is written to the database during data synchronization, PostgreSQL Reader cannot obtain the new data.
Data consistency cannot be ensured if you enable PostgreSQL Reader to use parallel threads to read data in a synchronization task.
PostgreSQL Reader shards the source table based on the value of the splitPk parameter and uses parallel threads to read data. These parallel threads belong to different transactions and read data at different points in time. Therefore, the parallel threads observe different snapshots.
Data inconsistencies cannot be prevented if parallel threads are used for a synchronization task. The following workarounds can be used:
Enable PostgreSQL Reader to use a single thread to read data in a synchronization task. This indicates that you do not need to specify a shard key for PostgreSQL Reader. This way, data consistency is ensured, but data is synchronized at low efficiency.
Make sure that no data is written to the source table during data synchronization. This ensures that the data in the source table remains unchanged during data synchronization. For example, you can lock the source table or disable data synchronization between primary and secondary databases. This way, data is efficiently synchronized, but your ongoing services may be interrupted.
Character encoding
A PostgreSQL database supports only the EUC_CN and UTF-8 encoding formats for simplified Chinese characters. PostgreSQL Reader uses JDBC to read data. This enables PostgreSQL Reader to automatically convert the encoding formats of characters. Therefore, you do not need to specify the encoding format.
If you specify the encoding format for a PostgreSQL database but data is written to the PostgreSQL database in a different encoding format, PostgreSQL Reader cannot identify this inconsistency and may export garbled characters.
Incremental data synchronization
PostgreSQL Reader uses JDBC to connect to a database and uses a SELECT statement with a
WHERE
clause to read incremental data.For batch data, incremental add, update, and delete operations (including logically delete operations) are distinguished by timestamps. Specify the WHERE clause based on a specific timestamp. The time indicated by the timestamp must be later than the time indicated by the latest timestamp in the previous synchronization.
For streaming data, specify the WHERE clause based on the ID of a specific record. The ID must be greater than the maximum ID involved in the previous synchronization.
If the data that is added or modified cannot be distinguished, PostgreSQL Reader can read only full data.
Syntax validation
PostgreSQL Reader allows you to configure custom SELECT statements by using the querySql parameter but does not verify the syntax of these statements.
Appendix 1: Code and parameters
Appendix: Configure a batch synchronization task by using the code editor
If you use the code editor to configure a batch synchronization task, you must configure parameters for the reader and writer of the related data source based on the format requirements in the code editor. For more information about the format requirements, see Configure a batch synchronization task by using the code editor. The following information describes the configuration details of parameters for the reader and writer in the code editor.
Code for PostgreSQL Reader
In the following code, a synchronization task is configured to read data from a PostgreSQL database. For more information about how to configure a synchronization task by using the code editor, see Configure a batch synchronization task by using the code editor.
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"stepType":"postgresql",// The plug-in name.
"parameter":{
"datasource":"",// The name of the data source.
"column":[// The names of the columns.
"col1",
"col2"
],
"where":"",// The WHERE clause.
"splitPk":"",// The shard key based on which the table is sharded. Data Integration runs parallel threads to synchronize data.
"table":""// The name of the table.
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The maximum number of dirty data records allowed.
},
"speed":{
"throttle":true, // Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent":1, // The maximum number of parallel threads.
"mbps":"12"// The maximum transmission rate. Unit: MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Parameters in code for PostgreSQL Reader
Parameter | Description | Required | Default value |
datasource | The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. | Yes | No default value |
table | The name of the table from which you want to read data. | Yes | No default value |
column | The names of the columns from which you want to read data. Specify the names in a JSON array. The default value is [ * ], which indicates all the columns in the source table.
| Yes | No default value |
splitPk | The field that is used for data sharding when PostgreSQL Reader reads data. If you configure this parameter, the source table is sharded based on the value of this parameter. Data Integration then runs parallel threads to read data. This way, data can be synchronized more efficiently.
| No | No default value |
where | The WHERE clause. PostgreSQL Reader generates an SQL statement based on the settings of the table, column, and where parameters and uses the generated statement to read data. For example, you can set this parameter to
| No | No default value |
querySql (advanced parameter, which is available only in the code editor) | The SQL statement that is used for refined data filtering. If you configure this parameter, Data Integration filters data based on the value of this parameter. For example, if you want to join multiple tables for data synchronization, set this parameter to | No | No default value |
fetchSize | The number of data records to read at a time. This parameter determines the number of interactions between Data Integration and the database and affects read efficiency. Note If you set this parameter to a value greater than 2048, an out of memory (OOM) error may occur during data synchronization. | No | 512 |
Code for PostgreSQL Writer
In the following code, a synchronization task is configured to write data to a PostgreSQL database. For more information about the parameters, see the parameter description.
{
"type":"job",
"version":"2.0",// The version number.
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"postgresql",// The plug-in name.
"parameter":{
"datasource":"",// The name of the data source.
"column":[// The names of the columns.
"col1",
"col2"
],
"table":"",// The name of the table.
"preSql":[],// The SQL statement that you want to execute before the synchronization task is run.
"postSql":[],// The SQL statement that you want to execute after the synchronization task is run.
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"// The maximum number of dirty data records allowed.
},
"speed":{
"throttle":true,// Specifies whether to enable throttling. The value false indicates that throttling is disabled, and the value true indicates that throttling is enabled. The mbps parameter takes effect only when the throttle parameter is set to true.
"concurrent":1, // The maximum number of parallel threads.
"mbps":"12"// The maximum transmission rate. Unit: MB/s.
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Parameters in code for PostgreSQL Writer
Parameter | Description | Required | Default value |
datasource | The name of the data source. It must be the same as the name of the added data source. You can add data sources by using the code editor. | Yes | No default value |
table | The name of the table to which you want to write data. | Yes | No default value |
writeMode | The write mode. Valid values:
| No | insert |
column | The names of the columns to which you want to write data. Separate the names with commas (,), such as | Yes | No default value |
preSql | The SQL statement that you want to execute before the synchronization task is run. For example, you can set this parameter to the SQL statement that is used to delete outdated data. You can execute only one SQL statement on the codeless UI and multiple SQL statements in the code editor. | No | No default value |
postSql | The SQL statement that you want to execute after the synchronization task is run. For example, you can set this parameter to the SQL statement that is used to add a timestamp. You can execute only one SQL statement on the codeless UI and multiple SQL statements in the code editor. | No | No default value |
batchSize | The number of data records to write at a time. Set this parameter to an appropriate value based on your business requirements. This greatly reduces the interactions between Data Integration and PostgreSQL and increases throughput. If you set this parameter to an excessively large value, an OOM error may occur during data synchronization. | No | 1,024 |
pgType | The PostgreSQL configuration for converting data types. Valid values: bigint[], double[], text[], jsonb, and json. Example:
| No | No default value |
Appendix 2: Enable SSL-based authentication for a PostgreSQL data source
Description of SSL-based authentication files for PostgreSQL
You can configure SSL-based authentication when you add or modify a PostgreSQL data source in DataWorks. The following table describes the configuration items that are relevant to SSL-based authentication.
PostgreSQL database | Configuration of a PostgreSQL data source in DataWorks | |||
SSL encryption | Client-side encryption | ACL configuration | Parameter | Description |
Enabled | Not enabled | N/A. | Truststore Certificate File | Optional. The client uses the certificate specified by this parameter to verify the identity of the server.
|
Enabled | The prefer authentication method is selected. |
| The Keystore Certificate File and Private Key File parameters are optional. If you select the prefer authentication method for your PostgreSQL database, the server does not forcefully verify the identity of the client.
| |
The verify-ca authentication method is selected. |
|
If the prefer authentication method is selected in ACL configuration, the client content is not forcefully verified.
If no file is configured during SSL-based authentication, a normal link is used for authentication.
If authentication files are added during SSL-based authentication, refer to the preceding table for authentication.
If the verify-ca authentication method is selected in ACL configuration, you must configure the Keystore Certificate File, Private Key File, and Private Key Password parameters when you add a PostgreSQL data source.
Obtain SSL-based authentication files
In this example, an ApsaraDB RDS for PostgreSQL instance is used.
Obtain the desired truststore certificate file.
For information about how to obtain a truststore certificate file, see Configure a cloud certificate to enable the SSL encryption feature.
Go to the Instances page in the ApsaraDB RDS console. Find the desired instance and click the instance ID in the Instance ID/Name column to go to the details page of the instance.
Select the endpoint that you want to protect. The following figure shows how to select the desired endpoint.
NoteIf you have applied for a public endpoint, the Select Protected Endpoint dialog box displays both the internal endpoint and public endpoint of the instance. However, each cloud certificate can protect only one endpoint. The internal endpoint is more secure than the public endpoint. Therefore, we recommend that you protect the public endpoint. For more information about how to view the internal endpoint and the public endpoint, see View and change the endpoints and port numbers.
For more information about how to protect the internal endpoint and the public endpoint at the same time, see Configure a custom certificate to enable the SSL encryption feature.
After a cloud certificate is configured, the status of the instance changes from Running to Modifying SSL. After approximately three minutes, the status changes back to Running. Then, you can perform subsequent operations.
c. Click Download CA Certificate to obtain the desired truststore certificate file.
The certificate that you downloaded is a package that contains three files. When you add a PostgreSQL data source in DataWorks, upload the file whose name is suffixed with
.pem
or.p7b
to configure the Truststore Certificate File parameter.Obtain the desired keystore certificate file and private key file and configure the private key password.
Prerequisites: SSL encryption is configured, or a custom certificate is configured. For more information, see Configure a cloud certificate to enable the SSL encryption feature or Configure a custom certificate to enable the SSL encryption feature. OpenSSL is installed.
NoteLinux operating systems are provided with OpenSSL. If you use a Linux operating system, you do not need to install OpenSSL. If you use a Windows operating system, you must download the OpenSSL software package and install OpenSSL. For more information, visit the Win32/Win64 OpenSSL page.
For information about how to obtain a keystore certificate file and a private key file and configure a private key password, see Configure a client CA certificate.
Create a self-signed certificate and a private key for the self-signed certificate. The self-signed certificate is saved in a file named ca1.crt. The private key is saved in a file named ca1.key
openssl req -new -x509 -days 3650 -nodes -out ca1.crt -keyout ca1.key -subj "/CN=root-ca1"
Create a certificate signing request (CSR) and a private key for the client certificate. The CSR is used to request a client certificate and is saved in a file named client.csr. The private key is saved in a file named client.key.
openssl req -new -nodes -text -out client.csr -keyout client.key -subj "/CN=<Username that is used for logons from the client>"
In the preceding command, the CN parameter follows the
-subj
parameter. You must set the CN parameter to the username of the account that is used by the client to connect to the instance.Create a client certificate. The client certificate is saved in a file named client.crt.
openssl x509 -req -in client.csr -text -days 365 -CA ca1.crt -CAkey ca1.key -CAcreateserial -out client.crt
If your ApsaraDB RDS for PostgreSQL server needs to verify the client CA certificate, open the ca1.crt file, and copy the content of the ca1.crt file to the Public Key field in the Enter Public Key from Client Certificate Authority dialog box as a client CA certificate.
After the client CA certificate is configured in the ApsaraDB RDS console, convert the client.key private key file into the client.pk8 file, and upload the client.pk8 file to configure the Private Key File parameter on the Add PostgreSQL Data Source page in the DataWorks console when you add a PostgreSQL data source in DataWorks.
cp client.key client.pk8
Configure a private key password.
openssl pkcs8 -topk8 -inform PEM -in client.key -outform der -out client.pk8 -v1 PBE-MD5-DES
NoteWhen you run the command that is used to configure a private key password, you must enter a password. If you specify a password, use the password when you configure the Private Key Password parameter on the Add PostgreSQL Data Source page in the DataWorks console.
Configure SSL-based authentication files
When you upload the obtained certificate files to the Add PostgreSQL Data Source page in the DataWorks console to add a PostgreSQL data source, perform the following operations in the DataWorks console:
Truststore Certificate File: Upload the file whose name is suffixed with
.pem
or.p7b
. The file is obtained in the step of obtaining a truststore certificate file.Keystore Certificate File: Upload the client.crt client certificate file that is obtained in the step of creating a client certificate.
Private Key File: Upload the client.pk8 file that is converted from the client.key private key file. The client.key private key file is obtained in the step of converting a private key file.
Private Key Password: Use the password that is obtained in the step of configuring a private key password.
Configure ACL: Go to the Instances page in the ApsaraDB RDS console. Find the desired instance and click the instance ID in the Instance ID/Name column to go to the details page of the instance. In the left-side navigation pane, click Data Security. On the SSL tab, click Modify to the right of Configure ACL in the Configure ACL (to Prevent Client Disguise) section to select an SSL-based authentication method based on your business requirements. For more information, see Forcefully enable the client to use SSL connections.
If you select the prefer authentication method, the PostgreSQL server does not forcefully verify the identity of the client.
If you select the verify-ca authentication method, you must upload valid client certificate files when you add a PostgreSQL data source in DataWorks to ensure that the server can verify the identity of the client.