All Products
Search
Document Center

AnalyticDB:Import data from Apache Flink

Last Updated:May 24, 2024

This topic describes how to import data from open source Apache Flink to an AnalyticDB for MySQL Data Warehouse Edition (V3.0) cluster.

Prerequisites

  • An Apache Flink driver is downloaded and deployed to the ${Flink deployment directory}/lib directory of all Apache Flink nodes. You can download a driver based on your Apache Flink version. The following list provides the download links for the driver packages that correspond to the Apache Flink versions:

    To download drivers for other Apache Flink versions, go to the JDBC SQL Connector page.

  • A MySQL driver is downloaded and deployed to the ${Flink deployment directory}/lib directory of all Apache Flink nodes.

    Note

    The version of the MySQL driver must be 5.1.40 or later. To download MySQL drivers, go to the mysql/mysql-connector-java page.

  • The Apache Flink cluster is restarted after all JAR packages are deployed. For more information about how to start an Apache Flink cluster, see Step 2: Start a Cluster.

  • A database and a table are created in an AnalyticDB for MySQL cluster to store the data that you want to write. For information about how to create a database and a table, see CREATE DATABASE and CREATE TABLE.

    Note
    • In this example, a database named tpch is created by executing the following statement:

      CREATE DATABASE IF NOT EXISTS tpch;
    • In this example, a table named person is created by executing the following statement:

      CREATE TABLE IF NOT EXISTS person(user_id string, user_name string, age int);
  • If the AnalyticDB for MySQL cluster is in elastic mode, you must turn on ENI in the Network Information section of the Cluster Information page.启用ENI网络

Usage notes

  • This topic describes only how to use Apache Flink SQL to create a table and write data to AnalyticDB for MySQL. For information about how to use Apache Flink Java Database Connectivity (JDBC) API to write data, see JDBC Connector.

  • The method described in this topic is applicable only to Apache Flink 1.11 and later. To write data from other Apache Flink versions to an AnalyticDB for MySQL cluster, you can use one of the following methods:

    • For information about how to write data from Apache Flink 1.9 and 1.10, see Flink v1.10.

    • For information about how to write data from Apache Flink 1.8 and earlier, see Flink v1.8.

Procedure

Note

In this example, a CSV file is used as the source data to be written.

Step

Description

Step 1: Prepare data

Create a CSV file, write data to the file, and then deploy the file to the /root directory of all Apache Flink nodes.

Step 2: Write data

Execute SQL statements to create a source table and a result table in Apache Flink and use the tables to write the source data to the AnalyticDB for MySQL cluster.

Step 3: Verify data

Log on to the AnalyticDB for MySQL database to check whether the source data is written.

Step 1: Prepare data

  1. In the root directory of an Apache Flink node, run the vim /root/data.csv command to create a CSV file named data.csv.

    The CSV file contains the following data. You can copy additional rows of data to increase the amount of data that you want to write.

    0,json00,20
    1,json01,21
    2,json02,22
    3,json03,23
    4,json04,24
    5,json05,25
    6,json06,26
    7,json07,27
    8,json08,28
    9,json09,29
  2. After the CSV file is created, deploy the file to the /root directory of other Apache Flink nodes.

Step 2: Write data

  1. Start and run the Apache Flink SQL application. For more information, see Starting the SQL Client CLI.

  2. Execute the following statement to create a source table named csv_person:

    CREATE TABLE if not exists csv_person (
      `user_id` STRING,
      `user_name` STRING,
      `age` INT
    ) WITH (
      'connector' = 'filesystem',
      'path' = 'file:///root/data.csv',
      'format' = 'csv',
      'csv.ignore-parse-errors' = 'true',
      'csv.allow-comments' = 'true'
    );
    Note
    • The column names and data types of the source table must be the same as those of the destination table in the AnalyticDB for MySQL cluster.

    • The path field in the preceding statement specifies the on-premises directory of the data.csv file. Make sure that the directories of the file on all Apache Flink nodes are the same. If the data.csv file is not stored on your on-premises device, specify the actual directory of the file.

      For information about other parameters in the preceding statement, see FileSystem SQL Connector.

  3. Execute the following statement to create a result table named mysql_person:

    CREATE TABLE mysql_person (
      user_id String,
      user_name String,
      age INT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true',
      'table-name' = '<table_name>',
      'username' = '<username>',
      'password' = '<password>',
      'sink.buffer-flush.max-rows' = '10',
      'sink.buffer-flush.interval' = '1s'
      );
    Note
    • The column names and data types of the result table must be the same as those of the destination table in the AnalyticDB for MySQL cluster.

    • The following table describes the parameters required to connect to an AnalyticDB for MySQL cluster. For information about the optional parameters, see the "Connector options" section of the JDBC SQL Connector topic.

    Parameter

    Description

    connector

    The connector type used by Apache Flink. Set this parameter to jdbc.

    url

    The JDBC URL of the AnalyticDB for MySQL cluster.

    Format: jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true'.

    • endpoint: the endpoint of the AnalyticDB for MySQL cluster.

      Note

      If you want to use a public endpoint to connect to the AnalyticDB for MySQL cluster, you must apply for a public endpoint. For information about how to apply for a public endpoint, see the "Apply for or release a public endpoint" section of the Apply for or release a public endpoint topic.

    • db_name: the name of the destination database in the AnalyticDB for MySQL cluster.

    • useServerPrepStmts=false&rewriteBatchedStatements=true: the configuration required to batch write data to the AnalyticDB for MySQL cluster. This configuration is used to improve the write performance and reduce the load on the AnalyticDB for MySQL cluster.

    Example: jdbc:mysql://am-**********.ads.aliyuncs.com:3306/tpch?useServerPrepStmts=false&rewriteBatchedStatements=true.

    table-name

    The name of the destination table in the AnalyticDB for MySQL cluster that is used to store the data that you want to write. In this example, the name of the destination table is person.

    username

    The name of the AnalyticDB for MySQL database account that has write permissions.

    Note
    • You can execute the SHOW GRANTS statement to query the permissions of the current account.

    • You can execute the GRANT statement to grant permissions to an account.

    password

    The password of the AnalyticDB for MySQL database account that has write permissions.

    sink.buffer-flush.max-rows

    The maximum number of rows that can be written from Apache Flink to the AnalyticDB for MySQL cluster at a time. Apache Flink receives data in real time. When the number of data rows that Apache Flink receives reaches the value of this parameter, the data rows are batch written to the AnalyticDB for MySQL cluster. Valid values:

    • 0: Apache Flink batch writes data only when the maximum time interval specified by the sink.buffer-flush.interval parameter is reached.

    • A value other than 0 that specifies a specific number of rows. Examples: 1000 or 2000.

    Note

    We recommend that you do not set this parameter to 0. If you set this parameter to 0, write performance degrades and the load on the AnalyticDB for MySQL cluster increases during concurrent queries.

    If you set both the sink.buffer-flush.max-rows and sink.buffer-flush.interval parameters to values other than 0, the following batch write rules apply:

    • If the number of data rows that Apache Flink receives reaches the value of the sink.buffer-flush.max-rows parameter but the maximum time interval does not reach the value of the sink.buffer-flush.interval parameter, Apache Flink batch writes data to the AnalyticDB for MySQL cluster without the need to wait for the maximum time interval to expire.

    • If the number of data rows that Apache Flink receives does not reach the value of the sink.buffer-flush.max-rows parameter but the maximum time interval reaches the value of the sink.buffer-flush.interval parameter, Apache Flink batch writes data to the AnalyticDB for MySQL cluster regardless of the amount of data that Apache Flink receives.

    sink.buffer-flush.interval

    The maximum time interval for Apache Flink to batch write data to the AnalyticDB for MySQL cluster, which is also the maximum amount of time required to wait before the next batch write operation. Valid values:

    • 0: Apache Flink batch writes data only when the maximum number of data rows specified by the sink.buffer-flush.max-rows parameter is reached.

    • A value other than 0 that specifies a specific time interval. Examples: 1d, 1h, 1min, 1s, or 1ms.

    Note

    We recommend that you do not set this parameter to 0 to ensure that data is written in a timely manner when the amount of source data is small during off-peak hours.

  4. Execute the INSERT INTO statement to import data. If the primary key has duplicate values, data is not repeatedly inserted, and the INSERT INTO statement is equivalent to the INSERT IGNORE INTO statement. For more information, see INSERT INTO.

    INSERT INTO mysql_person SELECT user_id, user_name, age FROM csv_person;

Step 3: Verify data

After the data is written, log on to the tpch database in the AnalyticDB for MySQL cluster and execute the following statement to check whether the source data is written to the person table:

SELECT * FROM person;