全部產品
Search
文件中心

:Flink資料匯入

更新時間:Jul 06, 2024

本文介紹如何將開源Flink中的資料匯入AnalyticDB MySQL版數倉版叢集。

前提條件

  • 下載Flink驅動,並將其部署到Flink所有節點的${flink部署目錄}/lib目錄下。您可以根據Flink版本下載對應的驅動:

    如需其他版本的驅動,請前往JDBC SQL Connector 頁面下載。

  • 下載MySQL驅動,並將其部署到Flink所有節點的${flink部署目錄}/lib目錄下。

    說明

    MySQL驅動版本需為5.1.40或以上,請前往MySQL驅動下載頁面下載。

  • 部署所有的JAR包後請重啟Flink叢集。啟動方式,請參見Start a Cluster

  • 已在目標AnalyticDB MySQL版叢集中建立資料庫和資料表,用於儲存需要寫入的資料。資料庫和資料表的建立方法,請參見CREATE DATABASECREATE TABLE

    說明
    • 本文樣本中建立的資料庫名稱為tpch,建庫語句如下:

      CREATE DATABASE IF NOT EXISTS tpch;
    • 本文樣本中建立的資料表名為person,建表語句如下:

      CREATE TABLE IF NOT EXISTS person(user_id string, user_name string, age int);
  • 如果您的AnalyticDB MySQL叢集是彈性模式,您需要在集群資訊頁面的網路資訊地區,開啟啟用ENI網絡的開關。啟用ENI網路

注意事項

流程介紹

說明

本文樣本以CSV格式的檔案作為輸入源介紹資料寫入流程。

步驟

說明

步驟一:資料準備

建立一個新的CSV檔案並在檔案中寫入來源資料,然後將新檔案部署至Flink所有節點的/root下。

步驟二:資料寫入

通過SQL語句在Flink中建立源表和結果表,並通過源表和結果表將資料寫入AnalyticDB MySQL中。

步驟三:資料驗證

登入AnalyticDB MySQL目標資料庫,來查看並驗證來源資料是否成功匯入。

步驟一:資料準備

  1. 在其中一個Flink節點的root目錄下,執行vim /root/data.csv命令來建立一個名為data.csv的CSV檔案。

    檔案中包含的資料如下(您可以多複製幾行相同的資料來增加寫入的資料量):

    0,json00,20
    1,json01,21
    2,json02,22
    3,json03,23
    4,json04,24
    5,json05,25
    6,json06,26
    7,json07,27
    8,json08,28
    9,json09,29
  2. 檔案建立完成後,將其部署至Flink其他節點的/root目錄下。

步驟二:資料寫入

  1. 啟動並運行Flink SQL程式。詳細操作步驟,請參見Starting the SQL Client CLI

  2. 建立一張名為csv_person的源表,語句如下:

    CREATE TABLE if not exists csv_person (
      `user_id` STRING,
      `user_name` STRING,
      `age` INT
    ) WITH (
      'connector' = 'filesystem',
      'path' = 'file:///root/data.csv',
      'format' = 'csv',
      'csv.ignore-parse-errors' = 'true',
      'csv.allow-comments' = 'true'
    );
    說明
    • 源表中的列名和資料類型需與AnalyticDB MySQL版中目標表的列名和資料類型保持一致。

    • 建表語句中填寫的pathdata.csv的本地路徑(Flink各個節點的路徑均需一致)。如果您的data.csv檔案不在本地,請根據實際情況填寫正確的路徑。

      關於建表語句中的其他參數說明,請參見FileSystem SQL Connector

  3. 建立一張名為mysql_person的結果表,語句如下:

    CREATE TABLE mysql_person (
      user_id String,
      user_name String,
      age INT
    ) WITH (
      'connector' = 'jdbc',
      'url' = 'jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true',
      'table-name' = '<table_name>',
      'username' = '<username>',
      'password' = '<password>',
      'sink.buffer-flush.max-rows' = '10',
      'sink.buffer-flush.interval' = '1s'
      );
    說明
    • 結果表中的列名和資料類型需與AnalyticDB MySQL版中目標表的列名和資料類型保持一致。

    • 下表僅列舉了串連AnalyticDB MySQL版叢集時的必填配置項,關於選填配置項的資訊,請參見Connector Options

    必填配置項

    說明

    connector

    指定Flink使用的連接器類型,選擇jdbc

    url

    AnalyticDB MySQL版叢集的JDBC URL。

    格式:jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true',其中:

    • endpoint:目標AnalyticDB MySQL版叢集的串連地址。

      說明

      如果需要使用公網地址串連叢集,您需要先申請公網地址,申請方法,請參見申請/釋放公網地址

    • db_nameAnalyticDB MySQL版中的目標資料庫名。

    • useServerPrepStmts=false&rewriteBatchedStatements=true:批量寫入資料至AnalyticDB MySQL版的必填配置,用於提高寫入效能,以及降低對AnalyticDB MySQL版叢集的壓力。

    樣本:jdbc:mysql://am-**********.ads.aliyuncs.com:3306/tpch?useServerPrepStmts=false&rewriteBatchedStatements=true

    table-name

    AnalyticDB MySQL版中的目標表名,用於儲存寫入的資料。本文樣本中目標表名為person

    username

    AnalyticDB MySQL版中具有寫入許可權的資料庫帳號名。

    說明
    • 您可以通過SHOW GRANTS查看當前帳號所擁有的許可權。

    • 您可以通過GRANT語句為目標帳號授予許可權。

    password

    AnalyticDB MySQL版中具有寫入許可權的資料庫帳號密碼。

    sink.buffer-flush.max-rows

    從Flink寫入資料至AnalyticDB MySQL版時,一次批量寫入的最大行數。Flink會接收即時資料,當接收到的資料行數達到最大寫入行數後,再將資料批量寫入AnalyticDB MySQL版叢集。可選取值如下:

    • 0:最大行數為0時,批量寫入資料功能僅考慮sink.buffer-flush.interval配置,即只要滿足最大間隔時間就會開始批量寫入。

    • 具體的行數,例如10002000等。

    說明

    不建議將該參數設定為0。取值為0不僅會導致寫入效能變差,也會導致AnalyticDB MySQL版叢集執行並發查詢時的壓力變大。

    sink.buffer-flush.max-rowssink.buffer-flush.interval配置均不為0時,批量寫入功能生效規則如下:

    • 若Flink接收到的資料量已達到sink.buffer-flush.max-rows所設的值,但最大時間間隔還未到達sink.buffer-flush.interval所設的值,那麼Flink無需等待間隔期滿,即可直接觸發批量寫入資料至AnalyticDB MySQL版

    • 若Flink接收到的資料量未達到sink.buffer-flush.max-rows所設的值,但間隔時間已達到sink.buffer-flush.interval所設的值,那麼無論Flink接收了多少資料量,都直接觸發批量寫入資料至AnalyticDB MySQL版

    sink.buffer-flush.interval

    Flink批量寫入資料至AnalyticDB MySQL版的最大間隔時間,即執行下一次批量寫入資料前的最大等待時間,可選取值如下:

    • 0:時間間隔為0時,批量寫入資料功能僅考慮sink.buffer-flush.max-rows配置,即只要Flink接收到的資料行數達到最大寫入行數後就會開始批量寫入。

    • 具體的時間間隔,例如1d1h1min1s1ms等。

    說明

    不建議將該參數設定為0,避免在業務低穀期產生來源資料較少的情境下,影響資料匯入的及時性。

  4. 使用INSERT INTO語句匯入資料,當主鍵重複時會自動忽略當前寫入資料,資料不做更新,作用等同於INSERT IGNORE INTO,更多資訊,請參見INSERT INTO。語句如下:

    INSERT INTO mysql_person SELECT user_id, user_name, age FROM csv_person;

步驟三:資料驗證

匯入完成後,您可以登入AnalyticDB MySQL叢集的目標庫tpch,執行如下語句查看並驗證來源資料是否成功匯入至目標表person中:

SELECT * FROM person;