This topic describes how to use Alibaba Cloud Realtime Compute for Apache Flink to read data from and write data to AnalyticDB for PostgreSQL.
Background information
AnalyticDB for PostgreSQL is a data warehouse for massively parallel processing (MPP). It provides online analysis services for a large amount of data. Realtime Compute for Apache Flink is a real-time big data analytics platform that is built based on Apache Flink. Realtime Compute for Apache Flink provides various upstream and downstream connectors to meet the requirements of different business scenarios and provides efficient and flexible real-time computing services. Realtime Compute for Apache Flink can read data from AnalyticDB for PostgreSQL. This can fully utilize the advantages of AnalyticDB for PostgreSQL and improve the efficiency and accuracy of data analytics.
Limits
Realtime Compute for Apache Flink cannot read data from AnalyticDB for PostgreSQL in serverless mode.
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.0 or later supports the AnalyticDB for PostgreSQL connector.
Only Realtime Compute for Apache Flink that uses VVR 8.0.1 or later supports AnalyticDB for PostgreSQL V7.0.
NoteIf you use a custom connector, perform operations by following the instructions that are described in Manage custom connectors.
Prerequisites
The AnalyticDB for PostgreSQL instance that you create and the fully managed Flink workspace that you create reside in the same virtual private cloud (VPC).
NoteIf the AnalyticDB for PostgreSQL instance and the fully managed Flink workspace do not reside in the same VPC, handle the issue by following the instructions provided in How does fully managed Flink access a service across VPCs?
A fully managed Flink workspace is created. For more information, see Activate fully managed Flink.
An AnalyticDB for PostgreSQL instance and an account are created. For more information, see Create an instance and Create a privileged account.
Step 1: Configure a whitelist and prepare data
- Log on to the AnalyticDB for PostgreSQL console.
Add the CIDR block of the fully managed Flink workspace to the whitelist of the AnalyticDB for PostgreSQL instance.
View the CIDR block of the vSwitch to which the fully managed Flink workspace belongs. For more information, see How do I configure a whitelist?
Add the CIDR block of the fully managed Flink workspace to the whitelist of the destination AnalyticDB for PostgreSQL instance. For more information, see Procedure.
NoteIf you access the AnalyticDB for PostgreSQL instance over the Internet, add the public IP address to the whitelist.
In the upper-right corner of the instance details page, click Log On to Database and enter the username and password. For more information about how to access a database, see Use client tools to connect to an instance.
Create a table named adbpg_dim_table in the destination database of the instance and insert 50 rows of data to the table.
Sample statements:
-- Create a dimension table named adbpg_dim_table. CREATE TABLE adbpg_dim_table( id int, username text, PRIMARY KEY(id) ); -- Insert 50 rows of data into the adbpg_dim_table table. The value of the id field is an integer from 1 to 50, and the value of the username field is the text for the current number of rows that is followed by the username string. INSERT INTO adbpg_dim_table(id, username) SELECT i, 'username'||i::text FROM generate_series(1, 50) AS t(i);
You can execute the
select * from adbpg_dim_table order by id;
statement to view the inserted data.Create a result table named adbpg_sink_table to which Realtime Compute for Apache Flink writes result data.
CREATE TABLE adbpg_sink_table( id int, username text, score int );
Step 2: Create a Realtime Compute for Apache Flink draft
Log on to the Realtime Compute for Apache Flink console. find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane, click
. In the upper-left corner of the SQL Editor page, click New. In the New Draft dialog box, click Blank Stream Draft on the SQL Scripts tab and click Next.In the New Draft dialog box, configure the parameters of the draft. The following table describes the parameters.
Parameter
Description
Example
Name
The name of the draft that you want to create.
NoteThe draft name must be unique in the current project.
adbpg-test
Location
The folder in which the code file of the draft is saved.
You can also click the icon to the right of an existing folder to create a subfolder.
Draft
Engine Version
The engine version of Flink that is used by the draft. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.
vvr-8.0.1-flink-1.17
Click Create.
Step 3: Write draft code and deploy the draft
Copy the following code of the draft to the code editor.
--- Create a Datagen source table. In this example, you do not need to modify the parameters in the WITH clause. CREATE TEMPORARY TABLE datagen_source ( id INT, score INT ) WITH ( 'connector' = 'datagen', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='50', 'fields.score.kind'='random', 'fields.score.min'='70', 'fields.score.max'='100' ); -- Create an AnalyticDB for PostgreSQL dimension table. You need to modify the parameters in the WITH clause based on your business requirements. CREATE TEMPORARY TABLE dim_adbpg( id int, username varchar, PRIMARY KEY(id) not ENFORCED ) WITH( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest', 'tablename' = 'adbpg_dim_table', 'username' = 'flinktest', 'password' = '${secret_values.adb_password}', 'maxRetryTimes'='2', -- Specify the maximum number of write retries after data fails to be written to the table. 'cache'='lru', -- Specify the cache policy. 'cacheSize'='100' -- Specify the cache size. ); -- Create an AnalyticDB for PostgreSQL result table. You need to modify the parameters in the WITH clause based on your business requirements. CREATE TEMPORARY TABLE sink_adbpg ( id int, username varchar, score int ) WITH ( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest', 'tablename' = 'adbpg_sink_table', 'username' = 'flinktest', 'password' = '${secret_values.adb_password}', 'maxRetryTimes' = '2', 'conflictMode' = 'ignore',-- Specify the policy that is used when a primary key conflict or index conflict occurs during data insertion. 'retryWaitTime' = '200' -- Specify the interval between retries. ); -- Insert the result that is obtained after the dimension table and source table are joined into the AnalyticDB for PostgreSQL result table. INSERT INTO sink_adbpg SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds JOIN dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts on ds.id = ts.id;
Modify the parameters based on your business requirements.
In this example, you do not need to modify the Datagen source table. You must modify the parameters of the AnalyticDB for PostgreSQL dimension table and AnalyticDB for PostgreSQL result table based on your business requirements. The following table describes the parameters. For more information about the parameters and data type mappings of related connectors, see References.
Parameter
Required
Description
url
Yes
The JDBC URL that is used to connect to the AnalyticDB for PostgreSQL instance. The JDBC URL is in the
jdbc:postgresql://<Internal endpoint>:<Port number>/<Database name>
format. You can log on to the AnalyticDB for PostgreSQL console to view the URL on the Database Connection page of the instance.tablename
Yes
The name of the table in the AnalyticDB for PostgreSQL database.
username
Yes
The username that is used to access the AnalyticDB for PostgreSQL database.
password
Yes
The password of the database account that is used to connect to the AnalyticDB for PostgreSQL database.
targetSchema
No
The name of the schema. Default value: public. If you use another schema in the database, specify this parameter.
In the upper-right corner of the SQL Editor page, click Validate to perform a syntax check.
In the upper-right corner of the SQL Editor page, click Deploy.
On the
page, find the desired deployment and click Start in the Actions column.
Step 4: View the data that Realtime Compute for Apache Flink writes to the result table
- Log on to the AnalyticDB for PostgreSQL console.
Click Log On to Database. For more information about how to connect to a database, see Client connection.
Execute the following statement to view the data that Realtime Compute for Apache Flink writes to the result table.
SELECT * FROM adbpg_sink_table ORDER BY id;
The following figure shows the result.