This topic describes how to use Realtime Compute for Apache Flink to write data to AnalyticDB for PostgreSQL.
Limits
Realtime Compute for Apache Flink cannot read data from AnalyticDB for PostgreSQL in serverless mode.
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.0 or later supports the AnalyticDB for PostgreSQL connector.
Only Realtime Compute for Apache Flink that uses VVR 8.0.1 or later supports AnalyticDB for PostgreSQL V7.0.
NoteIf you use a custom connector, perform operations by following the instructions that are described in Manage custom connectors.
Prerequisites
A fully managed Flink workspace is created. For more information, see Activate fully managed Flink.
An AnalyticDB for PostgreSQL instance is created. For more information, see Create an instance.
The AnalyticDB for PostgreSQL instance and the fully managed Flink workspace reside in the same virtual private cloud (VPC).
Configure an AnalyticDB for PostgreSQL instance
- Log on to the AnalyticDB for PostgreSQL console.
Add the CIDR block of the fully managed Flink workspace to an IP address whitelist of the AnalyticDB for PostgreSQL instance. For more information, see Configure an IP address whitelist.
Click Log On to Database. For more information about how to connect to a database, see Client connection.
Create a table on the AnalyticDB for PostgreSQL instance.
Sample statement:
CREATE TABLE test_adbpg_table( b1 int, b2 int, b3 text, PRIMARY KEY(b1) );
Configure a fully managed Flink workspace
Log on to the Realtime Compute for Apache Flink console.
On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane, click Connectors.
On the Connectors page, click Create Custom Connector.
Upload the JAR file of the custom connector that you want to create.
NoteFor information about how to obtain the JAR file of the custom connector of AnalyticDB for PostgreSQL, visit GitHub.
The JAR file must be in the same version as the Flink engine of Realtime Compute for Apache Flink.
After you upload the JAR file, click Next.
The system parses the content of the JAR file that you uploaded. If file parsing is successful, proceed to the next step. If file parsing fails, check whether the code of your custom connector complies with the standards that are defined by the Apache Flink community.
Click Finish.
The custom connector that you create appears in the connector list.
Create a Flink job
Log on to the Realtime Compute for Apache Flink console. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane, click SQL Editor. In the upper-left corner of the SQL Editor page, click New. In the New Draft dialog box, click Blank Stream Draft on the SQL Scripts tab and click Next.
In the New Draft dialog box, configure the parameters of the draft. The following table describes the parameters.
Parameter
Description
Example
Name
The name of the draft that you want to create.
NoteThe draft name must be unique in the current project.
adbpg-test
Location
The folder in which the code file of the draft is saved.
You can also click the icon to the right of an existing folder to create a subfolder.
Draft
Engine Version
The engine version of Flink that is used by the draft. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.
vvr-6.0.7-flink-1.15
Click Create.
Write data to AnalyticDB for PostgreSQL
Write deployment code.
Create a source table named
datagen_source
and an AnalyticDB for PostgreSQL table namedtest_adbpg_table
. Copy the following code to the code editor of the deployment.CREATE TABLE datagen_source ( f_sequence INT, f_random INT, f_random_str STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='1000', 'fields.f_random.min'='1', 'fields.f_random.max'='1000', 'fields.f_random_str.length'='10' ); CREATE TABLE test_adbpg_table ( `B1` bigint , `B2` bigint , `B3` VARCHAR , `B4` VARCHAR, PRIMARY KEY(B1) not ENFORCED ) with ( 'connector' = 'adbpg-nightly-1.13', 'password' = 'xxx', 'tablename' = 'test_adbpg_table', 'username' = 'xxxx', 'url' = 'jdbc:postgresql://url:5432/schema', 'maxretrytimes' = '2', 'batchsize' = '50000', 'connectionmaxactive' = '5', 'conflictmode' = 'ignore', 'usecopy' = '0', 'targetschema' = 'public', 'exceptionmode' = 'ignore', 'casesensitive' = '0', 'writemode' = '1', 'retrywaittime' = '200' );
In the preceding code, retain the values of parameters related to the
datagen_source
table and modify the values of parameters related to thetest_adbpg_table
table based on your business requirements. The following table describes the parameters of the test_adbpg_table table.Parameter
Required
Description
connector
Yes
The name of the connector. It is in the
adbpg-nightly-Version number
format. Example:adbpg-nightly-1.13
.url
Yes
The Java Database Connectivity (JDBC) URL that is used to connect to the AnalyticDB for PostgreSQL instance. The URL is in the
jdbc:postgresql://<Internal endpoint>:<Port number>/<Database name>
format. Example:jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres
.tablename
Yes
The name of the AnalyticDB for PostgreSQL table.
username
Yes
The name of the database account that is used to connect to the AnalyticDB for PostgreSQL database.
password
Yes
The password of the AnalyticDB for PostgreSQL database account.
maxretrytimes
No
The maximum number of retries that are allowed to write data to the table if a data writing attempt fails. Default value: 3.
batchsize
No
The maximum number of rows that can be written to the table at a time. Default value: 50000.
exceptionmode
No
The policy that is used to handle exceptions during data writing. Valid values:
ignore (default): The system ignores the data that is written during an exception.
strict: If an exception occurs during data writing, the system performs a failover and reports an error.
conflictmode
No
The policy that is used to handle primary key or unique index conflicts. Valid values:
ignore: If a primary key conflict occurs, the system ignores the primary key conflicts and retains the existing data.
strict: If a primary key conflict occurs, the system performs a failover and reports an error.
update: If a primary key conflict occurs, the system updates data.
upsert (default): If a primary key conflict occurs, the system performs UPSERT operations to write data.
AnalyticDB for PostgreSQL uses a combination of INSERT ON CONFLICT and COPY ON CONFLICT statements to perform UPSERT operations. If the destination table is a partitioned table, the minor version must be V6.3.6.1 or later. For information about how to update the minor version, see Update the minor engine version.
targetschema
No
The schema of the AnalyticDB for PostgreSQL database. Default value: public.
writemode
No
The method that is used to write data. Valid values:
0: The BATCH INSERT statement is used to write data.
1 (default): The COPY API is used to write data.
2: The BATCH UPSERT statement is used to write data.
verbose
No
Specifies whether to display connector logs. Valid values:
0 (default): does not display connector logs.
1: displays connector logs.
retrywaittime
No
The interval between retries when an exception occurs. Unit: milliseconds. Default value: 100.
batchwritetimeoutms
No
The timeout period of batch data writes. After this period ends, the data batch is written. Unit: milliseconds. Default value: 50000.
connectionmaxactive
No
The maximum number of active connections that can be allocated in a connection pool at the same time for a single task manager. Default value: 5.
casesensitive
No
Specifies whether column and table names are case-sensitive. Valid values:
0 (default): case-insensitive.
1: case-sensitive.
NoteFor more information about the supported parameters and data type mappings, see AnalyticDB for PostgreSQL connector.
Start the deployment.
In the upper-right corner of the SQL Editor page, click Deploy. In the message that appears, click OK.
NoteSession clusters are suitable for development and test environments in non-production environments. You can debug deployments in a session cluster to improve the resource utilization of the JobManager and accelerate the deployment startup. To ensure your business stability, we recommend that you do not publish a deployment to a session cluster. For more information, see Debug a deployment.
On the Deployments page, find the deployment that you want to start and click Start in the Actions column.
In the Start Job message, click Start.
Verify the synchronization result
Connect to the AnalyticDB for PostgreSQL database. For more information, see Client connection.
Execute the following statement to query the
test_adbpg_table
table:SELECT * FROM test_adbpg_table;
Data is written to the AnalyticDB for PostgreSQL database. The following figure shows the returned result.