Realtime Compute for Apache Flink allows you to ingest log data into data warehouses in real time. This topic describes how to create a draft that synchronizes data from an ApsaraMQ for Kafka instance to a Hologres instance in the development console of Realtime Compute for Apache Flink.
Background information
For example, a topic named users is created for an ApsaraMQ for Kafka instance and this topic contains 100 JSON data records. These JSON data records are the log data that is written to ApsaraMQ for Kafka by using a log file collection tool or application. The following figure shows the data distribution.
If you want to create a draft to synchronize all log data in the topic from the ApsaraMQ for Kafka instance to a Hologres instance, you can perform the following steps:
In this topic, the CREATE TABLE AS statement provided by Realtime Compute for Apache Flink is used to synchronize log data with one click and synchronize table schema changes in real time.
Prerequisites
If you want to access the development console of Realtime Compute for Apache Flink as a Resource Access Management (RAM) user or by assuming a RAM role, the required permissions are granted to the RAM user or RAM role. For more information, see Permission management.
A Realtime Compute for Apache Flink workspace is created. For more information, see Activate Realtime Compute for Apache Flink.
Upstream and downstream storage instances are created.
An ApsaraMQ for Kafka instance is created. For more information, see Step 3: Create resources.
A Hologres instance is created. For more information, see Purchase a Hologres instance.
NoteThe ApsaraMQ for Kafka instance and the Hologres instance must reside in the same virtual private cloud (VPC) as the Realtime Compute for Apache Flink workspace. If the instances do not reside in the same VPC as the workspace, you must establish network connections between the ApsaraMQ for Kafka instance and the workspace and between the Hologres instance and the workspace. For more information, see the How does Realtime Compute for Apache Flink access a service across VPCs? and How does Realtime Compute for Apache Flink access the Internet? sections of the "Reference" topic.
Step 1: Configure IP address whitelists
To allow Realtime Compute for Apache Flink to access the ApsaraMQ for Kafka instance and Hologres instance, you must add the CIDR block of the vSwitch to which the Realtime Compute for Apache Flink workspace belongs to the whitelists of the ApsaraMQ for Kafka instance and Hologres instance.
Obtain the CIDR block of the vSwitch to which the Realtime Compute for Apache Flink workspace belongs.
Log on to the management console of Realtime Compute for Apache Flink.
On the Fully Managed Flink tab, find your workspace and choose in the Actions column.
In the Workspace Details dialog box, view the CIDR block about the vSwitch to which the Realtime Compute for Apache Flink workspace belongs.
Add the CIDR block of the vSwitch to which the Realtime Compute for Apache Flink workspace belongs to the IP address whitelist of the ApsaraMQ for Kafka instance.
For more information, see Configure whitelists.
Add the CIDR block of the vSwitch to which the Realtime Compute for Apache Flink workspace belongs to the IP address whitelist of the Hologres instance.
For more information, see Configure an IP address whitelist.
Step 2: Prepare test data of the ApsaraMQ for Kafka instance
Use a Faker source table of Realtime Compute for Apache Flink as a data generator and write the data to the ApsaraMQ for Kafka instance. You can perform the following steps to write data to an ApsaraMQ for Kafka instance in the development console of Realtime Compute for Apache Flink.
Create a topic named users in the ApsaraMQ for Kafka console.
For more information, see the "Step 1: Create a topic" section of the Step 3: Create resources topic.
Create a draft that writes data to a specific ApsaraMQ for Kafka instance.
Log on to the management console of Realtime Compute for Apache Flink.
Find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane, choose . On the page that appears, click New.
In the New Draft dialog box, select a template to create a draft. On the SQL Scripts tab of the New Draft dialog box, click Blank Stream Draft. Then, click Next. In the New Draft dialog box, configure the parameters of the draft. The following table describes the parameters.
Parameter
Example
Description
Name
kafka-data-input
The name of the draft that you want to create.
NoteThe draft name must be unique in the current project.
Location
Development
The folder in which the code file of the draft is stored. By default, the code file of a draft is stored in the Development folder.
You can also click the icon to the right of an existing folder to create a subfolder.
Engine Version
vvr-8.0.5-flink-1.17
Select the engine version of the draft from the Engine Version drop-down list.
Click Create.
Copy the following code of a draft to the code editor.
CREATE TEMPORARY TABLE source ( id INT, first_name STRING, last_name STRING, `address` ROW<`country` STRING, `state` STRING, `city` STRING>, event_time TIMESTAMP ) WITH ( 'connector' = 'faker', 'number-of-rows' = '100', 'rows-per-second' = '10', 'fields.id.expression' = '#{number.numberBetween ''0'',''1000''}', 'fields.first_name.expression' = '#{name.firstName}', 'fields.last_name.expression' = '#{name.lastName}', 'fields.address.country.expression' = '#{Address.country}', 'fields.address.state.expression' = '#{Address.state}', 'fields.address.city.expression' = '#{Address.city}', 'fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}' ); CREATE TEMPORARY TABLE sink ( id INT, first_name STRING, last_name STRING, `address` ROW<`country` STRING, `state` STRING, `city` STRING>, `timestamp` TIMESTAMP METADATA ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000', 'topic' = 'users', 'format' = 'json' ); INSERT INTO sink SELECT * FROM source;
Modify the following parameter configurations based on your business requirements.
Parameter
Example
Description
properties.bootstrap.servers
alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000
The IP addresses or endpoints of Kafka brokers.
Format: host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).
topic
users
The name of the Kafka topic.
Start the deployment for the draft.
In the left-side navigation pane, choose . On the page that appears, click Deploy.
In the Deploy draft dialog box, click Confirm.
On the Deployments page, configure resources for the deployment of the draft. For more information, see Configure resources for a deployment.
In the left-side navigation pane, choose Start a deployment. . On the Deployments page, find the deployment that you want to manage and click Start in the Actions column. For more information about the parameters that you must configure when you start your deployment, see
On the Deployments page, view the status and information about the deployment.
The Faker data source provides bounded streams. Therefore, the deployment becomes complete about one minute after the deployment remains in the RUNNING state. When the deployment is complete, data in the deployment is written to the users topic of the ApsaraMQ for Kafka instance. The following sample code shows the format of the JSON data that is written to the ApsaraMQ for Kafka instance.
{ "id": 765, "first_name": "Barry", "last_name": "Pollich", "address": { "country": "United Arab Emirates", "state": "Nevada", "city": "Powlowskifurt" } }
Step 3: Create a Hologres catalog
If you want to perform single-table synchronization, you must create a destination table in a destination catalog. You can create a destination catalog in the development console of Realtime Compute for Apache Flink. In this topic, a Hologres catalog is used as the destination catalog. This section describes how to create a Hologres catalog.
Create a Hologres catalog named holo.
For more information, see the "Create a Hologres catalog" section of the Manage Hologres catalogs topic.
ImportantYou must make sure that a database named flink_test_db is created in the instance to which you want to synchronize data. Otherwise, an error is returned when you create a catalog.
On the Schemas tab, verify that the catalog named holo is created.
Step 4: Create a data synchronization draft and start a data synchronization deployment
Log on to the development console of Realtime Compute for Apache Flink and create a data synchronization draft.
Log on to the management console of Realtime Compute for Apache Flink.
Find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane, choose . On the page that appears, click New.
In the New Draft dialog box, select a template to create a draft. On the SQL Scripts tab of the New Draft dialog box, click Blank Stream Draft. Then, click Next. In the New Draft dialog box, configure the parameters of the draft. The following table describes the parameters.
Parameter
Example
Description
Name
flink-quickstart-test
The name of the draft that you want to create.
NoteThe draft name must be unique in the current project.
Location
Development
The folder in which the code file of the draft is stored. By default, the code file of a draft is stored in the Development folder.
You can also click the icon to the right of an existing folder to create a subfolder.
Engine Version
vvr-8.0.5-flink-1.17
Select the engine version of the draft from the Engine Version drop-down list.
Click Create.
Copy the following code of a draft to the code editor.
You can use one of the following methods to synchronize data of the users topic from the ApsaraMQ for Kafka instance to the sync_kafka_users table of the flink_test_db database in Hologres. You can use one of the following methods to specify the data types of the input and output values:
Use the CREATE TABLE AS statement
If you execute the CREATE TABLE AS statement to synchronize data, you do not need to manually create the table in Hologres or configure the types of the columns to which data is written as JSON or JSONB.
CREATE TEMPORARY TABLE kafka_users ( `id` INT NOT NULL, `address` STRING, `offset` BIGINT NOT NULL METADATA, `partition` BIGINT NOT NULL METADATA, `timestamp` TIMESTAMP METADATA, `date` AS CAST(`timestamp` AS DATE), `country` AS JSON_VALUE(`address`, '$.country'), PRIMARY KEY (`partition`, `offset`) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000', 'topic' = 'users', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true', -- Automatically expand nested columns. 'scan.startup.mode' = 'earliest-offset' ); CREATE TABLE IF NOT EXISTS holo.flink_test_db.sync_kafka_users WITH ( 'connector' = 'hologres' ) AS TABLE kafka_users;
NoteTo prevent duplicate data from being written to Hologres after a deployment failover, you can add the related primary key to the table to uniquely identify data. If data is retransmitted, Hologres ensures that only one copy of data that has the same values of the partition and offset fields is retained.
Use the INSERT INTO statement
A special method is used to optimize JSON and JSONB data in Hologres. Therefore, you can use the INSERT INTO statement to write nested JSON data to Hologres.
If you use the INSERT INTO statement to synchronize data, you must manually create a table in Hologres and configure the types of the columns to which data is written as JSON or JSONB. Then, you can execute the INSERT INTO statement to write the address data to the column of the JSON type in Hologres.
CREATE TEMPORARY TABLE kafka_users ( `id` INT NOT NULL, 'address' STRING, -- The data in this column is nested JSON data. `offset` BIGINT NOT NULL METADATA, `partition` BIGINT NOT NULL METADATA, `timestamp` TIMESTAMP METADATA, `date` AS CAST(`timestamp` AS DATE), `country` AS JSON_VALUE(`address`, '$.country') ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000', 'topic' = 'users', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true', -- Automatically expand nested columns. 'scan.startup.mode' = 'earliest-offset' ); CREATE TEMPORARY TABLE holo ( `id` INT NOT NULL, `address` STRING, `offset` BIGINT, `partition` BIGINT, `timestamp` TIMESTAMP, `date` DATE, `country` STRING ) WITH ( 'connector' = 'hologres', 'endpoint' = 'hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80', 'username' = 'LTAI5tE572UJ44Xwhx6i****', 'password' = 'KtyIXK3HIDKA9VzKX4tpct9xTm****', 'dbname' = 'flink_test_db', 'tablename' = 'sync_kafka_users' ); INSERT INTO holo SELECT * FROM kafka_users;
The following table describes the parameters that you can configure in the code.
Parameter
Example
Description
properties.bootstrap.servers
alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000
The IP addresses or endpoints of Kafka brokers.
Format: host:port,host:port,host:port. Separate multiple host:port pairs with commas (,).
topic
users
The name of the Kafka topic.
endpoint
hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80
The endpoint of the Hologres instance.
Format: <ip>:<port>.
username
LTAI5tE572UJ44Xwhx6i****
The username that is used to access the Hologres database. You must enter the AccessKey ID of your Alibaba Cloud account.
password
KtyIXK3HIDKA9VzKX4tpct9xTm****
The password that is used to access the Hologres database. You must enter the AccessKey secret of your Alibaba Cloud account.
dbname
flink_test_db
The name of the Hologres database that you want to access.
tablename
sync_kafka_users
The name of the Hologres table.
NoteIf you use the INSERT INTO statement to synchronize data, you must create the sync_kafka_users table and define required fields in the database of the destination instance.
If the public schema is not used, you must set tablename to schema.tableName.
Click Save.
In the left-side navigation pane, choose . On the page that appears, click Deploy.
In the left-side navigation pane, choose For more information about the parameters that you must configure when you start your deployment, see Start a deployment. . On the Deployments page, find the deployment that you want to manage and click Start in the Actions column.
In the Start Job dialog box, click Start.
You can view the status and information of the deployment on the Deployments page after the deployment is started.
Step 5: View the result of full data synchronization
Log on to the Hologres console.
On the Instances page, click the name of the instance that you want to manage.
In the upper-right corner of the page, click Connect to Instance.
On the Metadata Management tab, view the schema and data of the sync_kafka_users table to which data is synchronized in the users database.
The following figures show the schema and data of the sync_kafka_users table after full data synchronization.
Table schema
Double-click the name of the sync_kafka_users table to view the table schema.
NoteDuring data synchronization, we recommend that you declare the partition and offset fields of Kafka as the primary key for the Hologres table. This way, if data is retransmitted due to a deployment failover, only one copy of the data that has the same values of the partition and offset fields is stored.
Table data
In the upper-right corner of the page for the sync_kafka_users table, click Query table. In the SQL editor, enter the following statement and click Run.
SELECT * FROM public.sync_kafka_users order by partition, "offset";
The following figure shows the data of the sync_kafka_users table.
Step 6: Check whether table schema changes are automatically synchronized
Manually send a message that contains a new column in the ApsaraMQ for Kafka console.
Log on to the ApsaraMQ for Kafka console.
On the Instances page, click the name of the instance that you want to manage.
In the left-side navigation pane of the page that appears, click Topics. On the page that appears, find the topic named users.
Choose More > Quick Start in the Actions column.
In the Start to Send and Consume Message panel, configure the parameters and enter the content of the test message.
Parameter
Example
Method of Sending
Select Console.
Message Key
Enter flinktest.
Message Content
Copy and paste the following JSON content to the Message Content field.
{ "id": 100001, "first_name": "Dennise", "last_name": "Schuppe", "address": { "country": "Isle of Man", "state": "Montana", "city": "East Coleburgh" }, "house-points": { "house": "Pukwudgie", "points": 76 } }
NoteIn this example, house-points is a new nested column.
Send to Specified Partition
Select Yes.
Partition ID
Enter 0.
Click OK.
In the Hologres console, view the changes in the schema and data of the sync_kafka_users table.
Log on to the Hologres console.
On the Instances page, click the name of the instance that you want to manage.
In the upper-right corner of the page, click Connect to Instance.
On the Metadata Management tab, double-click the name of the sync_kafka_users table.
In the upper-right corner of the page for the sync_kafka_users table, click Query table. In the SQL editor, enter the following statement and click Run.
SELECT * FROM public.sync_kafka_users order by partition, "offset";
View the data of the table.
The following figure shows the data of the sync_kafka_users table.
The figure shows that the data record whose id is 100001 is written to Hologres. In addition, the house-points.house and house-points.points columns are added to Hologres.
NoteOnly data in the nested column house-points is included in the data that is inserted into the table of ApsaraMQ for Kafka. However, json.infer-schema.flatten-nested-columns.enable is declared in the parameters of the WITH clause for the kafka_users table. In this case, Realtime Compute for Apache Flink automatically expands the new nested column. After the column is expanded, the path to access the column is used as the name of the column.
(Optional) Step 7: Adjust deployment resource configurations
To ensure optimal deployment performance, we recommend that you adjust the parallelism of deployments and resource configurations of different nodes based on the amount of data that needs to be processed. To adjust the parallelism of deployments and the number of CUs in a simple manner, use the basic resource configuration mode. To adjust the parallelism of deployments and resource configurations of nodes in a more fine-grained manner, use the expert resource configuration mode.
Log on to the development console of Realtime Compute for Apache Flink and go to the deployment details page.
Log on to the management console of Realtime Compute for Apache Flink.
Find the workspace that you want to manage and click Console in the Actions column.
In the left-side navigation pane of the development console of Realtime Compute for Apache Flink, choose .
Modify resource configurations.
On the Configuration tab, click Edit in the upper-right corner of the Resources section and select Expert for the Mode parameter.
In the Resource Plan section, click Get Plan Now.
Move the pointer over More and click Expand All.
You can view the complete topology to learn the data synchronization plan of the deployment. The plan shows the tables that need to be synchronized.
Manually configure PARALLELISM for each operator.
The table in the users topic of ApsaraMQ for Kafka has four partitions. Therefore, you can set the PARALLELISM parameter for ApsaraMQ for Kafka to 4. Log data is written to only one Hologres table. To reduce the number of connections to Hologres, you can set the PARALLELISM parameter for Hologres to 2. For more information about how to configure resource parameters, see Configure a deployment. The following figure shows the resource configuration plan of the deployment after the parallelism is adjusted.
Click OK.
Configure the parameters in the Basic section. On the Deployments page, find the desired deployment and click Start in the Actions column. For more information about the parameters that you must configure when you start your deployment, see Start a deployment.
In the left-side navigation pane, choose . On the Deployments page, click the name of the deployment that you want to manage.
On the Status tab, view the effect after resource reconfiguration of the deployment.
References
For more information about the CREATE TABLE AS statement, see CREATE TABLE AS statement.
For more information about the synchronization of table schema changes when the CREATE TABLE AS statement is executed to synchronize data from an ApsaraMQ for Kafka source table, see Create an ApsaraMQ for Kafka source table.