This topic describes how to import data from open source Apache Flink to an AnalyticDB for MySQL Data Warehouse Edition (V3.0) cluster.
Prerequisites
An Apache Flink driver is downloaded and deployed to the ${Flink deployment directory}/lib directory of all Apache Flink nodes. You can download a driver based on your Apache Flink version. The following list provides the download links for the driver packages that correspond to the Apache Flink versions:
Apache Flink 1.11: flink-connector-jdbc_2.11-1.11.0.jar
Apache Flink 1.12: flink-connector-jdbc_2.11-1.12.0.jar
Apache Flink 1.13: flink-connector-jdbc_2.11-1.13.0.jar
To download drivers for other Apache Flink versions, go to the JDBC SQL Connector page.
A MySQL driver is downloaded and deployed to the ${Flink deployment directory}/lib directory of all Apache Flink nodes.
NoteThe version of the MySQL driver must be 5.1.40 or later. To download MySQL drivers, go to the mysql/mysql-connector-java page.
The Apache Flink cluster is restarted after all JAR packages are deployed. For more information about how to start an Apache Flink cluster, see Step 2: Start a Cluster.
A database and a table are created in an AnalyticDB for MySQL cluster to store the data that you want to write. For information about how to create a database and a table, see CREATE DATABASE and CREATE TABLE.
NoteIn this example, a database named
tpch
is created by executing the following statement:CREATE DATABASE IF NOT EXISTS tpch;
In this example, a table named
person
is created by executing the following statement:CREATE TABLE IF NOT EXISTS person(user_id string, user_name string, age int);
If the AnalyticDB for MySQL cluster is in elastic mode, you must turn on ENI in the Network Information section of the Cluster Information page.
Usage notes
This topic describes only how to use Apache Flink SQL to create a table and write data to AnalyticDB for MySQL. For information about how to use Apache Flink Java Database Connectivity (JDBC) API to write data, see JDBC Connector.
The method described in this topic is applicable only to Apache Flink 1.11 and later. To write data from other Apache Flink versions to an AnalyticDB for MySQL cluster, you can use one of the following methods:
For information about how to write data from Apache Flink 1.9 and 1.10, see Flink v1.10.
For information about how to write data from Apache Flink 1.8 and earlier, see Flink v1.8.
Procedure
In this example, a CSV file is used as the source data to be written.
Step | Description |
Create a CSV file, write data to the file, and then deploy the file to the /root directory of all Apache Flink nodes. | |
Execute SQL statements to create a source table and a result table in Apache Flink and use the tables to write the source data to the AnalyticDB for MySQL cluster. | |
Log on to the AnalyticDB for MySQL database to check whether the source data is written. |
Step 1: Prepare data
In the root directory of an Apache Flink node, run the
vim /root/data.csv
command to create a CSV file named data.csv.The CSV file contains the following data. You can copy additional rows of data to increase the amount of data that you want to write.
0,json00,20 1,json01,21 2,json02,22 3,json03,23 4,json04,24 5,json05,25 6,json06,26 7,json07,27 8,json08,28 9,json09,29
After the CSV file is created, deploy the file to the /root directory of other Apache Flink nodes.
Step 2: Write data
Start and run the Apache Flink SQL application. For more information, see Starting the SQL Client CLI.
Execute the following statement to create a source table named
csv_person
:CREATE TABLE if not exists csv_person ( `user_id` STRING, `user_name` STRING, `age` INT ) WITH ( 'connector' = 'filesystem', 'path' = 'file:///root/data.csv', 'format' = 'csv', 'csv.ignore-parse-errors' = 'true', 'csv.allow-comments' = 'true' );
NoteThe column names and data types of the source table must be the same as those of the destination table in the AnalyticDB for MySQL cluster.
The
path
field in the preceding statement specifies the on-premises directory of the data.csv file. Make sure that the directories of the file on all Apache Flink nodes are the same. If the data.csv file is not stored on your on-premises device, specify the actual directory of the file.For information about other parameters in the preceding statement, see FileSystem SQL Connector.
Execute the following statement to create a result table named
mysql_person
:CREATE TABLE mysql_person ( user_id String, user_name String, age INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true', 'table-name' = '<table_name>', 'username' = '<username>', 'password' = '<password>', 'sink.buffer-flush.max-rows' = '10', 'sink.buffer-flush.interval' = '1s' );
NoteThe column names and data types of the result table must be the same as those of the destination table in the AnalyticDB for MySQL cluster.
The following table describes the parameters required to connect to an AnalyticDB for MySQL cluster. For information about the optional parameters, see the "Connector options" section of the JDBC SQL Connector topic.
Parameter
Description
connector
The connector type used by Apache Flink. Set this parameter to
jdbc
.url
The JDBC URL of the AnalyticDB for MySQL cluster.
Format:
jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true'
.endpoint
: the endpoint of the AnalyticDB for MySQL cluster.NoteIf you want to use a public endpoint to connect to the AnalyticDB for MySQL cluster, you must apply for a public endpoint. For information about how to apply for a public endpoint, see the "Apply for or release a public endpoint" section of the Apply for or release a public endpoint topic.
db_name
: the name of the destination database in the AnalyticDB for MySQL cluster.useServerPrepStmts=false&rewriteBatchedStatements=true
: the configuration required to batch write data to the AnalyticDB for MySQL cluster. This configuration is used to improve the write performance and reduce the load on the AnalyticDB for MySQL cluster.
Example:
jdbc:mysql://am-**********.ads.aliyuncs.com:3306/tpch?useServerPrepStmts=false&rewriteBatchedStatements=true
.table-name
The name of the destination table in the AnalyticDB for MySQL cluster that is used to store the data that you want to write. In this example, the name of the destination table is
person
.username
The name of the AnalyticDB for MySQL database account that has write permissions.
NoteYou can execute the SHOW GRANTS statement to query the permissions of the current account.
You can execute the GRANT statement to grant permissions to an account.
password
The password of the AnalyticDB for MySQL database account that has write permissions.
sink.buffer-flush.max-rows
The maximum number of rows that can be written from Apache Flink to the AnalyticDB for MySQL cluster at a time. Apache Flink receives data in real time. When the number of data rows that Apache Flink receives reaches the value of this parameter, the data rows are batch written to the AnalyticDB for MySQL cluster. Valid values:
0: Apache Flink batch writes data only when the maximum time interval specified by the
sink.buffer-flush.interval
parameter is reached.A value other than 0 that specifies a specific number of rows. Examples: 1000 or 2000.
NoteWe recommend that you do not set this parameter to 0. If you set this parameter to 0, write performance degrades and the load on the AnalyticDB for MySQL cluster increases during concurrent queries.
If you set both the
sink.buffer-flush.max-rows
andsink.buffer-flush.interval
parameters to values other than 0, the following batch write rules apply:If the number of data rows that Apache Flink receives reaches the value of the
sink.buffer-flush.max-rows
parameter but the maximum time interval does not reach the value of thesink.buffer-flush.interval
parameter, Apache Flink batch writes data to the AnalyticDB for MySQL cluster without the need to wait for the maximum time interval to expire.If the number of data rows that Apache Flink receives does not reach the value of the
sink.buffer-flush.max-rows
parameter but the maximum time interval reaches the value of thesink.buffer-flush.interval
parameter, Apache Flink batch writes data to the AnalyticDB for MySQL cluster regardless of the amount of data that Apache Flink receives.
sink.buffer-flush.interval
The maximum time interval for Apache Flink to batch write data to the AnalyticDB for MySQL cluster, which is also the maximum amount of time required to wait before the next batch write operation. Valid values:
0: Apache Flink batch writes data only when the maximum number of data rows specified by the
sink.buffer-flush.max-rows
parameter is reached.A value other than 0 that specifies a specific time interval. Examples: 1d, 1h, 1min, 1s, or 1ms.
NoteWe recommend that you do not set this parameter to 0 to ensure that data is written in a timely manner when the amount of source data is small during off-peak hours.
Execute the
INSERT INTO
statement to import data. If the primary key has duplicate values, data is not repeatedly inserted, and the INSERT INTO statement is equivalent to theINSERT IGNORE INTO
statement. For more information, see INSERT INTO.INSERT INTO mysql_person SELECT user_id, user_name, age FROM csv_person;
Step 3: Verify data
After the data is written, log on to the tpch
database in the AnalyticDB for MySQL cluster and execute the following statement to check whether the source data is written to the person
table:
SELECT * FROM person;