All Products
Search
Document Center

Realtime Compute for Apache Flink:Use Realtime Compute for Apache Flink to read data from and write data to AnalyticDB for PostgreSQL

Last Updated:Sep 12, 2024

This topic describes how to use Alibaba Cloud Realtime Compute for Apache Flink to read data from and write data to AnalyticDB for PostgreSQL.

Background information

AnalyticDB for PostgreSQL is a data warehouse for massively parallel processing (MPP). It provides online analysis services for a large amount of data. Realtime Compute for Apache Flink is a real-time big data analytics platform that is built based on Apache Flink. Realtime Compute for Apache Flink provides various upstream and downstream connectors to meet the requirements of different business scenarios and provides efficient and flexible real-time computing services. Realtime Compute for Apache Flink can read data from AnalyticDB for PostgreSQL. This can fully utilize the advantages of AnalyticDB for PostgreSQL and improve the efficiency and accuracy of data analytics.

Limits

  • Realtime Compute for Apache Flink cannot read data from AnalyticDB for PostgreSQL in serverless mode.

  • Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.0 or later supports the AnalyticDB for PostgreSQL connector.

  • Only Realtime Compute for Apache Flink that uses VVR 8.0.1 or later supports AnalyticDB for PostgreSQL V7.0.

    Note

    If you use a custom connector, perform operations by following the instructions that are described in Manage custom connectors.

Prerequisites

Step 1: Configure a whitelist and prepare data

  1. Log on to the AnalyticDB for PostgreSQL console.
  2. Add the CIDR block of the fully managed Flink workspace to the whitelist of the AnalyticDB for PostgreSQL instance.

    1. View the CIDR block of the vSwitch to which the fully managed Flink workspace belongs. For more information, see How do I configure a whitelist?

    2. Add the CIDR block of the fully managed Flink workspace to the whitelist of the destination AnalyticDB for PostgreSQL instance. For more information, see Procedure.

      Note

      If you access the AnalyticDB for PostgreSQL instance over the Internet, add the public IP address to the whitelist.

  3. In the upper-right corner of the instance details page, click Log On to Database and enter the username and password. For more information about how to access a database, see Use client tools to connect to an instance.

  4. Create a table named adbpg_dim_table in the destination database of the instance and insert 50 rows of data to the table.

    Sample statements:

    -- Create a dimension table named adbpg_dim_table. 
    CREATE TABLE adbpg_dim_table(
    id int,
    username text,
    PRIMARY KEY(id)
    );
    
    -- Insert 50 rows of data into the adbpg_dim_table table. The value of the id field is an integer from 1 to 50, and the value of the username field is the text for the current number of rows that is followed by the username string. 
    INSERT INTO adbpg_dim_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 50) AS t(i);

    You can execute the select * from adbpg_dim_table order by id; statement to view the inserted data.

  5. Create a result table named adbpg_sink_table to which Realtime Compute for Apache Flink writes result data.

    CREATE TABLE adbpg_sink_table(
      id int,
      username text,
      score int
    );

Step 2: Create a Realtime Compute for Apache Flink draft

  1. Log on to the Realtime Compute for Apache Flink console. find the workspace that you want to manage and click Console in the Actions column.

  2. In the left-side navigation pane, click Development > ETL. In the upper-left corner of the SQL Editor page, click New. In the New Draft dialog box, click Blank Stream Draft on the SQL Scripts tab and click Next.

  3. In the New Draft dialog box, configure the parameters of the draft. The following table describes the parameters.

    Parameter

    Description

    Example

    Name

    The name of the draft that you want to create.

    Note

    The draft name must be unique in the current project.

    adbpg-test

    Location

    The folder in which the code file of the draft is saved.

    You can also click the 新建文件夹 icon to the right of an existing folder to create a subfolder.

    Draft

    Engine Version

    The engine version of Flink that is used by the draft. For more information about engine versions, version mappings, and important time points in the lifecycle of each version, see Engine version.

    vvr-8.0.1-flink-1.17

  4. Click Create.

Step 3: Write draft code and deploy the draft

  1. Copy the following code of the draft to the code editor.

    --- Create a Datagen source table. In this example, you do not need to modify the parameters in the WITH clause. 
    CREATE TEMPORARY TABLE datagen_source (
     id INT,
     score INT
    ) WITH (
     'connector' = 'datagen', 
     'fields.id.kind'='sequence',
     'fields.id.start'='1',
     'fields.id.end'='50',
     'fields.score.kind'='random',
     'fields.score.min'='70',
     'fields.score.max'='100'
    );
    
    -- Create an AnalyticDB for PostgreSQL dimension table. You need to modify the parameters in the WITH clause based on your business requirements. 
    CREATE TEMPORARY TABLE dim_adbpg(
     id int,
     username varchar,
     PRIMARY KEY(id) not ENFORCED
    ) WITH(
     'connector' = 'adbpg', 
     'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
     'tablename' = 'adbpg_dim_table', 
     'username' = 'flinktest',
     'password' = '${secret_values.adb_password}',
     'maxRetryTimes'='2', -- Specify the maximum number of write retries after data fails to be written to the table. 
     'cache'='lru',  -- Specify the cache policy. 
     'cacheSize'='100'  -- Specify the cache size.
    );
    
    -- Create an AnalyticDB for PostgreSQL result table. You need to modify the parameters in the WITH clause based on your business requirements. 
    CREATE TEMPORARY TABLE sink_adbpg (
      id int,
      username varchar,
      score int
    ) WITH (
      'connector' = 'adbpg', 
      'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
      'tablename' = 'adbpg_sink_table',  
      'username' = 'flinktest',
      'password' = '${secret_values.adb_password}',
      'maxRetryTimes' = '2',
      'conflictMode' = 'ignore',-- Specify the policy that is used when a primary key conflict or index conflict occurs during data insertion. 
      'retryWaitTime' = '200'  -- Specify the interval between retries. 
    );
    
    -- Insert the result that is obtained after the dimension table and source table are joined into the AnalyticDB for PostgreSQL result table. 
    INSERT INTO sink_adbpg
    SELECT ts.id,ts.username,ds.score
    FROM datagen_source AS ds
    JOIN dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts
    on ds.id = ts.id;
  2. Modify the parameters based on your business requirements.

    In this example, you do not need to modify the Datagen source table. You must modify the parameters of the AnalyticDB for PostgreSQL dimension table and AnalyticDB for PostgreSQL result table based on your business requirements. The following table describes the parameters. For more information about the parameters and data type mappings of related connectors, see References.

    Parameter

    Required

    Description

    url

    Yes

    The JDBC URL that is used to connect to the AnalyticDB for PostgreSQL instance. The JDBC URL is in the jdbc:postgresql://<Internal endpoint>:<Port number>/<Database name> format. You can log on to the AnalyticDB for PostgreSQL console to view the URL on the Database Connection page of the instance.

    tablename

    Yes

    The name of the table in the AnalyticDB for PostgreSQL database.

    username

    Yes

    The username that is used to access the AnalyticDB for PostgreSQL database.

    password

    Yes

    The password of the database account that is used to connect to the AnalyticDB for PostgreSQL database.

    targetSchema

    No

    The name of the schema. Default value: public. If you use another schema in the database, specify this parameter.

  3. In the upper-right corner of the SQL Editor page, click Validate to perform a syntax check.

  4. In the upper-right corner of the SQL Editor page, click Deploy.

  5. On the O&M > Deployments page, find the desired deployment and click Start in the Actions column.

Step 4: View the data that Realtime Compute for Apache Flink writes to the result table

  1. Log on to the AnalyticDB for PostgreSQL console.
  2. Click Log On to Database. For more information about how to connect to a database, see Client connection.

  3. Execute the following statement to view the data that Realtime Compute for Apache Flink writes to the result table.

    SELECT * FROM adbpg_sink_table ORDER BY id;

    The following figure shows the result.

    image.png

References