This topic describes how to use Apache Flink to synchronize data from Kafka to AnalyticDB for PostgreSQL.
Prerequisites
- The IP address of the Flink client is added to the whitelist of an AnalyticDB for PostgreSQL instance. For more information about how to configure a whitelist, see Configure an IP address whitelist.
- Kafka connector dependencies are deployed in the $FLINK_HOME/lib path of the Flink client. For information about the Table API Kafka connector provided by Apache Flink, see Apache Kafka SQL Connector. In this topic, the following dependencies are used:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.17-SNAPSHOT</version> </dependency>
- AnalyticDB for PostgreSQL connector dependencies are deployed in the $FLINK_HOME/lib path of the Flink client. For more information about how to obtain the JAR package of the AnalyticDB for PostgreSQL connector, see AnalyticDB PostgreSQL Connector.
In this topic, AnalyticDB for PostgreSQL connector 1.13 is used. We recommend that you select a version that is close to the Flink version.
Usage notes
The use of Alibaba Cloud Realtime Compute for Apache Flink is similar to that of Apache Flink. For more information, see What is Alibaba Cloud Realtime Compute for Apache Flink?
Procedure
- On the Flink client, create a table that is used to read data from a Kafka table.
CREATE TABLE KafkaTable ( `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING, `event_time` TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, `shard` BIGINT METADATA FROM 'partition' VIRTUAL, `meta_offset` BIGINT METADATA FROM 'offset' VIRTUAL ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'debezium-json' );
The following table describes the Kafka connector parameters.
Parameter Required Description connector Yes The name of the connector. The value is automatically set to kafka
.topic Yes The name of the Kafka topic. properties.bootstrap.servers Yes The endpoint and port number that are used to connect to the Kafka client. properties.group.id Yes The ID of the Kafka consumer group. scan.startup.mode No The starting consumer offset. For more information, see Start Reading Position. value.format Yes The format that is used to serialize and deserialize the Kafka message. For more information about the formats and configurations, see Formats. For more information about Kafka connector parameters, see Connector Options.
Each Kafka message record consists of useful metadata such as the timestamp, offset, and partition information. The event_time, meta_offset, and shard columns in the test table record the useful information of Kafka messages. For more information about available metadata, see Available Metadata.
- In the AnalyticDB for PostgreSQL instance, create a destination table.
CREATE TABLE ADBPGTargetTable ( user_id BIGINT primary key, item_id BIGINT, behavior VARCHAR, event_time TIMESTAMP, shard BIGINT, -- The partition column of the Kafka table is replaced with the shard column because partition is a reserved keyword in AnalyticDB for PostgreSQL. meta_offset BIGINT -- The offset column of the Kafka table is replaced with the meta_offset column because offset is a reserved keyword in AnalyticDB for PostgreSQL. );
- On the Flink client, create a table that is used to synchronize data to the AnalyticDB for PostgreSQL instance. We recommend that you use the same schema as that of the source table created in Step 1.
CREATE TABLE ADBPGTargetTable ( `user_id` BIGINT primary key, `item_id` BIGINT, `behavior` STRING, `event_time` TIMESTAMP(3), `shard` BIGINT, -- The partition column of the Kafka table is replaced with the shard column because partition is a reserved keyword in AnalyticDB for PostgreSQL. `meta_offset` BIGINT -- The offset column of the Kafka table is replaced with the meta_offset column because offset is a reserved keyword in AnalyticDB for PostgreSQL. ) WITH ( 'connector' = 'adbpg-nightly-1.13', 'password' = 'Password01', 'tablename' = 'ADBPGTargetTable', 'username' = 'user01', 'url' = 'jdbc:postgresql://gp-bp15s3b9kn00j****-master.gpdb.rds.aliyuncs.com:5432/postgres', 'maxretrytimes' = '2', 'batchsize' = '50000', 'connectionmaxactive' = '5', 'conflictmode' = 'ignore', 'usecopy' = '0', 'targetschema' = 'public', 'exceptionmode' = 'ignore', 'casesensitive' = '0', 'writemode' = '1', 'retrywaittime' = '200' );
The following table describes the AnalyticDB for PostgreSQL connector parameters.
Parameter Required Description connector Yes The name of the connector. The value is in the adbpg-nightly-Version number
format.For example, if AnalyticDB for PostgreSQL connector 1.13 is used, the connector name is
adbpg-nightly-1.13
.url Yes The JDBC URL that is used to connect to the AnalyticDB for PostgreSQL instance. It is in the jdbc:postgresql://<Endpoint:port>/<Database name>
format. Example:jdbc:postgresql://gp-bp15s3b9kn00j****-master.gpdb.rds.aliyuncs.com:5432/postgres
.tablename Yes The name of the table that is created in the AnalyticDB for PostgreSQL instance. username Yes The name of the database account that is used to connect to the AnalyticDB for PostgreSQL instance. password Yes The password of the database account that is used to connect to the AnalyticDB for PostgreSQL instance. maxretrytimes No The maximum number of retries that are allowed after a statement fails to be executed. Default value: 3. batchsize No The maximum number of rows that can be written at a time. Default value: 50000. exceptionmode No The policy that is used to handle exceptions during data writing. Default value: ignore. Valid values: - ignore: If an exception occurs during data writing, the system ignores the error data.
- strict: If an exception occurs during data writing, the system performs a failover and reports an error.
conflictmode No The policy that is used to handle primary key or unique index conflicts. Default value: upsert. Valid values: - ignore: If a primary key conflict occurs, the system ignores the primary key conflict and retains the existing data.
- strict: If a primary key conflict occurs, the system performs a failover and reports an error.
- update: If a primary key conflict occurs, the system updates data.
- upsert: If a primary key conflict occurs, the system performs UPSERT operations to write data.
AnalyticDB for PostgreSQL uses INSERT ON CONFLICT and COPY ON CONFLICT to implement UPSERT operations.
If the destination table is a partitioned table, the AnalyticDB for PostgreSQL instance must run minor version 6.3.6.1 or later. For more information about how to update the minor engine version of your instance, see Update the minor engine version.
targetschema No The schema of the AnalyticDB for PostgreSQL instance. Default value: public. writemode No The method that is used to write data. Default value: 1. Valid values: - 0: The BATCH INSERT statement is used to write data.
- 1: The COPY API is used to write data.
- 2: The BATCH UPSERT statement is used to write data.
verbose No Specifies whether to display connector logs. Default value: 0. Valid values: - 0: does not display connector logs.
- 1: displays connector logs.
retrywaittime No The retry interval after an exception occurs. Default value: 100. Unit: milliseconds. batchwritetimeoutms No The timeout period for batch data writing. After this period ends, data is batch written. Default value: 50000. Unit: milliseconds. connectionmaxactive No The maximum number of active connections that can be allocated in a connection pool for a single task manager. Default value: 5. casesensitive No Specifies whether column and table names are case-sensitive. Default value: 0. Valid values: - 0: case-insensitive
- 1: case-sensitive
- On the Flink client, execute the INSERT INTO statement to synchronize data from the Kafka table to the AnalyticDB for PostgreSQL table.
INSERT INTO ADBPGTargetTable SELECT * FROM KafkaSourceTable;
The execution status is displayed in the console of the Flink client, as shown in the following figure.