After you configure a change tracking task, you can use the flink-dts-connector file to consume tracked data. This topic describes how to use the flink-dts-connector file to consume tracked data.
Usage notes
Data Transmission Service (DTS) supports the following types of Flink programs: DataStream API and Table API & SQL.
If you use a Table API & SQL program, you can consume the data of only one table each time you configure a change tracking task. If you want to consume the data of multiple tables, you must configure a task for each table.
Procedure
In this example, IntelliJ IDEA Community Edition 2020.1 Windows is used.
- Create a change tracking task. For more information, see the relevant topics in Overview of change tracking scenarios.
- Create one or more consumer groups. For more information, see Create consumer groups.
Download the flink-dts-connector file and decompress it.
Open IntelliJ IDEA. In the window that appears, click Open or Import.
In the dialog box that appears, go to the directory where the flink-dts-connector file is decompressed, and expand the folders to find the pom.xml file.
- In the dialog box that appears, select Open as Project.
Add the following dependency to the pom.xml file:
<dependency> <groupId>com.alibaba.flink</groupId> <artifactId>flink-dts-connector</artifactId> <version>1.1.1-SNAPSHOT</version> <classifier>jar-with-dependencies</classifier> </dependency>
In IntelliJ IDEA, expand the folders to find the Java files. Then, double-click a Java file based on the type of Flink connector that you use.
If you use a DataStream API connector, you must double-click the flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\datastream\DtsExample.java file and perform the following operations:
In the top menu bar of IntelliJ IDEA, click the Run icon.
In the dialog box that appears, choose
.In the Program arguments field, enter the parameters and corresponding values, and then click Run to run flink-dts-connector.
NoteFor more information about the parameters and methods to obtain the parameter values, see the Parameters section of this topic.
--broker-url dts-cn-******.******.***:****** --topic cn_hangzhou_rm_**********_dtstest_version2 --sid dts****** --user dtstest --password Test123456 --checkpoint 1624440043
The following figure shows that the Flink program can track data changes from the source database.
NoteTo query specific records of data changes, you can go to the Task Manager page of the Flink program.
If you use a Table API & SQL connector, you must double-click the flink-dts-connector-master\src\test\java\com\alibaba\flink\connectors\dts\sql\DtsTableISelectTCaseTest.java file and perform the following operations:
NoteA single
DtsTableISelectTCaseTest.java
file can be used to configure only one change tracking task and consume the data of only one table. If you want to consume the data of multiple tables, you must configure a task for each table.Enter two forward slashes (
//
) and add comments, as shown in the following figure.Specify the information of the table from which you want to track data changes. SQL statements are supported.
Set the parameters required for the change tracking instance. For more information, see the Parameters section of this topic.
In the top menu bar of IntelliJ IDEA, click Run'DtsTableISelectTCaseTest' to run flink-dts-connector.
The following figure shows that the Flink program can track data changes from the source database.
NoteTo query specific records of data changes, you can go to the Task Manager page of the Flink program.
Parameters
Parameter in the DstExample file | Parameter in the DtsTableISelectTCaseTest file | Description | Method to obtain the parameter value |
|
| The endpoint and port number of the change tracking instance. Note If you track data changes over internal networks, the network latency is minimal. This is applicable if the Elastic Compute Service (ECS) instance on which you deploy the Flink program resides on the classic network or in the same virtual private cloud (VPC) as the change tracking instance. | In the DTS console, find the change tracking instance that you want to manage and click the instance ID. On the Basic Information page, you can view Topic and Network of the instance. |
|
| The name of the topic of the change tracking instance. | |
|
| The ID of the consumer group. | In the DTS console, find the change tracking instance that you want to manage and click the instance ID. In the left-side navigation pane, click Consume Data. You can view Consumer Group ID/Name of the instance and Account of the consumer group. Note The password of the consumer group account is specified when you create the consumer group. |
|
| The username of the consumer group. Warning If you are not using the flink-dts-connector file that is described in this topic, you must specify this parameter in the following format: | |
|
| The password of the consumer group. | |
|
| The consumer offset. It is the timestamp generated when flink-dts-connector consumes the first data record. The value is a UNIX timestamp. Example: 1624440043. Note The consumer offset can be used in the following scenarios:
| The consumer offset of consumed data must be within the data range of the change tracking instance. The consumer offset must be converted to a UNIX timestamp. Note
|
N/A |
| The objects for change tracking. You can specify only a single table in the format of | In the DTS console, find the change tracking instance that you want to manage and click the instance ID. In the upper part of the Basic Information or Task Management page, click View Objects to view the database and table to which the objects for change tracking belong. |
FAQ
Error message | Possible cause | Solution |
| The DStore module used by DTS to read incremental data is switched. As a result, the consumer offset of the Flink program is lost. | You do not need to restart the Flink program. You need to only query the consumer offset of the Flink program and set the |