You can use the Flink StarRocks connector to cache data and then import the data to StarRocks at a time in Stream Load mode. This topic describes how to use the Flink StarRocks connector and provides examples.
Background information
The Flink Java Database Connectivity (JDBC) connector provided by Apache Flink cannot meet the import performance requirements of StarRocks. Therefore, Alibaba Cloud provides the Flink StarRocks connector to help you cache data and then import the data to StarRocks at a time in Stream Load mode.
Use the Flink connector
You can download the source code of the Flink StarRocks connector for testing from the starrocks-connector-for-apache-flink page on GitHub.
Add the following code to the pom.xml file of your project:
<dependency>
<groupId>com.starrocks</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<!-- for flink-1.11, flink-1.12 -->
<version>x.x.x_flink-1.11</version>
<!-- for flink-1.13 -->
<version>x.x.x_flink-1.13</version>
</dependency>
You can view the latest version of the Flink StarRocks connector on the version information page and replace x.x.x
in the preceding code with the latest version.
Sample code:
Method 1
// -------- sink with raw json string stream -------- fromElements(new String[]{ "{\"score\": \"99\", \"name\": \"stephen\"}", "{\"score\": \"100\", \"name\": \"lebron\"}" }).addSink( StarRocksSink.sink( // the sink options StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx") .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port") .withProperty("username", "xxx") .withProperty("password", "xxx") .withProperty("table-name", "xxx") .withProperty("database-name", "xxx") .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") .build() ) ); // -------- sink with stream transformation -------- class RowData { public int score; public String name; public RowData(int score, String name) { ...... } } fromElements( new RowData[]{ new RowData(99, "stephen"), new RowData(100, "lebron") } ).addSink( StarRocksSink.sink( // the table structure TableSchema.builder() .field("score", DataTypes.INT()) .field("name", DataTypes.VARCHAR(20)) .build(), // the sink options StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx") .withProperty("load-url", "fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port") .withProperty("username", "xxx") .withProperty("password", "xxx") .withProperty("table-name", "xxx") .withProperty("database-name", "xxx") .withProperty("sink.properties.column_separator", "\\x01") .withProperty("sink.properties.row_delimiter", "\\x02") .build(), // set the slots with streamRowData (slots, streamRowData) -> { slots[0] = streamRowData.score; slots[1] = streamRowData.name; } ) );
Method 2
// create a table with `structure` and `properties` // Needed: Add `com.starrocks.connector.flink.table.StarRocksDynamicTableSinkFactory` to: `src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory` tEnv.executeSql( "CREATE TABLE USER_RESULT(" + "name VARCHAR," + "score BIGINT" + ") WITH ( " + "'connector' = 'starrocks'," + "'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx'," + "'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port'," + "'database-name' = 'xxx'," + "'table-name' = 'xxx'," + "'username' = 'xxx'," + "'password' = 'xxx'," + "'sink.buffer-flush.max-rows' = '1000000'," + "'sink.buffer-flush.max-bytes' = '300000000'," + "'sink.buffer-flush.interval-ms' = '5000'," + "'sink.properties.column_separator' = '\\x01'," + "'sink.properties.row_delimiter' = '\\x02'," + "'sink.max-retries' = '3'" + "'sink.properties.*' = 'xxx'" + // stream load properties like `'sink.properties.columns' = 'k1, v1'` ")" );
The following table describes the parameters of a Flink sink.
Parameter | Required | Default value | Type | Description |
connector | Yes | No default value | String | The type of the connector. Set the value to starrocks. |
jdbc-url | Yes | No default value | String | The JDBC URL that is used to connect to StarRocks and perform queries in StarRocks. |
load-url | Yes | No default value | String | The IP addresses and HTTP ports of frontends, in the format of |
database-name | Yes | No default value | String | The name of the StarRocks database. |
table-name | Yes | No default value | String | The name of the StarRocks table. |
username | Yes | No default value | String | The username that is used to connect to the StarRocks database. |
password | Yes | No default value | String | The password that is used to connect to the StarRocks database. |
sink.semantic | No | at-least-once | String | The semantics of the sink. Valid values: at-least-once and exactly-once. |
sink.buffer-flush.max-bytes | No | 94371840 (90 MB) | String | The maximum amount of data that is allowed in the buffer. You can specify a data amount in the range of 64 MB to 10 GB. |
sink.buffer-flush.max-rows | No | 500000 | String | The maximum number of data rows that are allowed in the buffer. Valid values: 64000 to 5000000. |
sink.buffer-flush.interval-ms | No | 300000 | String | The refresh interval of the buffer. Valid values: 1000 to 3600000. Unit: milliseconds. |
sink.max-retries | No | 1 | String | The maximum number of retries. Valid values: 0 to 10. |
sink.connect.timeout-ms | No | 1000 | String | The timeout period for connecting to the IP addresses and HTTP ports of frontends specified by the load-url parameter. Valid values: 100 to 60000. Unit: milliseconds. |
sink.properties.* | No | No default value | String | The properties of the sink. |
To ensure the exactly-once semantics of Flink sinks, external systems must provide a mechanism that supports the two-phase commit protocol. StarRocks does not have this mechanism, and therefore must depend on the checkpointing feature of Flink. Each time Flink generates a checkpoint, a batch of data and their labels are cached as a state. After the checkpoint is generated, the system is blocked to wait until the cached data of the state is written to StarRocks. Then, Flink starts to generate the next checkpoint. This is how StarRocks ensures exactly-once semantics. In this case, if StarRocks breaks down, the stream operators of your Flink sink may be blocked for an extended period of time due to a connection failure. As a result, an alert is triggered and the Flink import job is forcibly terminated.
By default, the data is imported in the CSV format. You can set the sink.properties.row_delimiter parameter to \\x02 to customize a row delimiter and set the sink.properties.column_separator parameter to \\x01 to customize a column delimiter. The sink.properties.row_delimiter parameter is supported in StarRocks 1.15.0 and later.
If your import job is unexpectedly stopped, you can increase the memory capacity of the job.
If the code is run properly and data can be queried but data fails to be written, you must check whether your machine can access the HTTP ports of backends. In other words, you must check whether the IP addresses of backends can be reached by PING messages.
For example, your machine has a public IP address and an internal IP address, the frontends or backends of a cluster can be accessed by using public IP addresses and HTTP ports, and internal IP addresses are used to access backends in the cluster. If you specify the public IP addresses and HTTP ports of frontends by using the
load-url
parameter when you submit an import job, the frontends forward the write operation to the internal IP addresses and HTTP ports of backends. If the internal IP addresses of backends cannot be reached by PING messages from your machine, the write operation fails.
Example: Synchronize data from a MySQL database by using the Flink StarRocks connector
Basic principles
You can synchronize data from a MySQL database to a StarRocks cluster within seconds by using the Flink change data capture (CDC) connector and StarRocks migration tools.
The images and some information in this topic are from Real-time synchronization from MySQL of open source StarRocks.
StarRocks migration tools can automatically generate the CREATE TABLE statements for the source table and sink table based on the information about and table schemas of the MySQL database and StarRocks cluster.
Procedure
Make preparations.
Download the installation package of Flink.
We recommend that you use Apache Flink 1.13. The earliest version that is supported is Apache Flink 1.11.
Download the package of the Flink CDC connector.
Download a package of the Flink CDC connector for MySQL based on the version of Apache Flink that you use.
Download the package of the Flink StarRocks connector.
ImportantThe Flink StarRocks connector varies for Apache Flink 1.13, 1.11, and 1.12.
Copy and paste the downloaded flink-sql-connector-mysql-cdc-xxx.jar and flink-connector-starrocks-xxx.jar packages to the flink-xxx/lib/ directory.
Download the smt.tar.gz package, decompress the package, and then modify the configuration file.
Modify the following parameters in the configuration file:
db
: the connection information of the MySQL database.be_num
: the number of nodes in the StarRocks cluster.[table-rule.1]
: the matching rules. You can use regular expressions to match the database names and table names that are used to create the CREATE TABLE statements. You can also configure multiple sets of rules.flink.starrocks.*
: the configuration information of the StarRocks cluster.
Sample code:
[db] host = 192.168.**.** port = 3306 user = root password = [other] # number of backends in StarRocks be_num = 3 # `decimal_v3` is supported since StarRocks-1.18.1 use_decimal_v3 = false # file to save the converted DDL SQL output_dir = ./result [table-rule.1] # pattern to match databases for setting properties database = ^console_19321.*$ # pattern to match tables for setting properties table = ^.*$ ############################################ ### flink sink configurations ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated ############################################ flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030 flink.starrocks.load-url= 192.168.**.**:8030 flink.starrocks.username=root flink.starrocks.password= flink.starrocks.sink.properties.column_separator=\x01 flink.starrocks.sink.properties.row_delimiter=\x02 flink.starrocks.sink.buffer-flush.interval-ms=15000
Run the
starrocks-migrate-tool
command to generate all the CREATE TABLE statements in the /result directory.You can run the
ls result
command to display the files in the /result directory.flink-create.1.sql smt.tar.gz starrocks-create.all.sql flink-create.all.sql starrocks-create.1.sql
Run the following command to create a database and a table in the StarRocks cluster:
Mysql -h <IP address of a frontend> -P 9030 -u root -p < starrocks-create.1.sql
Run the following command to generate tables in Flink and continuously synchronize data:
bin/sql-client.sh -f flink-create.1.sql
ImportantIf you use a version of Apache Flink earlier than 1.13, you may not be able to directly run the SQL script. You must execute the SQL statements in the script one by one and enable binary logging for the MySQL database.
Run the following command to query the status of Flink jobs:
bin/flink list -running
You can view the details and status of Flink jobs on the web UI of Flink or in the log files in the $FLINK_HOME/log directory.
When you run the sample code provided in this topic, take note of the following items:
If you need to configure multiple sets of rules, you must configure the rules used to match the database names and table names, and configure the Flink StarRocks connector for each set of rules.
Sample code:
[table-rule.1] # pattern to match databases for setting properties database = ^console_19321.*$ # pattern to match tables for setting properties table = ^.*$ ############################################ ### flink sink configurations ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated ############################################ flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030 flink.starrocks.load-url= 192.168.**.**:8030 flink.starrocks.username=root flink.starrocks.password= flink.starrocks.sink.properties.column_separator=\x01 flink.starrocks.sink.properties.row_delimiter=\x02 flink.starrocks.sink.buffer-flush.interval-ms=15000 [table-rule.2] # pattern to match databases for setting properties database = ^database2.*$ # pattern to match tables for setting properties table = ^.*$ ############################################ ### flink sink configurations ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated ############################################ flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030 flink.starrocks.load-url= 192.168.**.**:8030 flink.starrocks.username=root flink.starrocks.password= # If you cannot select appropriate delimiters to import data in the CSV format, you can import data in the JSON format. However, the import performance may deteriorate. To import data in the JSON format, replace the flink.starrocks.sink.properties.column_separator and flink.starrocks.sink.properties.row_delimiter parameters with the following parameters: flink.starrocks.sink.properties.strip_outer_array=true flink.starrocks.sink.properties.format=json
NoteYou can use the flink.starrocks.sink parameters to configure properties for each set of rules, such as the import frequency.
After sharding is performed, data in a large table may be split into multiple tables or even distributed to multiple databases. In this case, you can configure a set of rules to synchronize data from multiple tables to one table.
For example, both the edu_db_1 and edu_db_2 databases have two tables, course_1 and course_2. All the tables have the same schema. You can run the following code to synchronize data from these tables to a table in StarRocks for further analysis:
[table-rule.3] # pattern to match databases for setting properties database = ^edu_db_[0-9]*$ # pattern to match tables for setting properties table = ^course_[0-9]*$ ############################################ ### flink sink configurations ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated ############################################ flink.starrocks.jdbc-url=jdbc:mysql://192.168.**.**:9030 flink.starrocks.load-url= 192.168.**.**:8030 flink.starrocks.username=root flink.starrocks.password= flink.starrocks.sink.properties.column_separator=\x01 flink.starrocks.sink.properties.row_delimiter=\x02 flink.starrocks.sink.buffer-flush.interval-ms=5000
After you run the code, a many-to-one synchronization relationship is automatically generated. By default, the name of the table in StarRocks is course__auto_shard. You can modify the table name in the generated SQL files.
If you need to create tables and synchronize data on the CLI of an SQL client, you must escape the backslash (
\
).Sample code:
'sink.properties.column_separator' = '\\x01' 'sink.properties.row_delimiter' = '\\x02'