本文介紹如何將開源Flink中的資料匯入AnalyticDB MySQL版數倉版叢集。
前提條件
下載Flink驅動,並將其部署到Flink所有節點的${flink部署目錄}/lib目錄下。您可以根據Flink版本下載對應的驅動:
Flink 1.11版本:flink-connector-jdbc_2.11-1.11.0.jar
Flink 1.12版本:flink-connector-jdbc_2.11-1.12.0.jar
Flink 1.13版本:flink-connector-jdbc_2.11-1.13.0.jar
如需其他版本的驅動,請前往JDBC SQL Connector 頁面下載。
下載MySQL驅動,並將其部署到Flink所有節點的${flink部署目錄}/lib目錄下。
說明MySQL驅動版本需為5.1.40或以上,請前往MySQL驅動下載頁面下載。
部署所有的JAR包後請重啟Flink叢集。啟動方式,請參見Start a Cluster。
已在目標AnalyticDB MySQL版叢集中建立資料庫和資料表,用於儲存需要寫入的資料。資料庫和資料表的建立方法,請參見CREATE DATABASE和CREATE TABLE。
說明本文樣本中建立的資料庫名稱為
tpch
,建庫語句如下:CREATE DATABASE IF NOT EXISTS tpch;
本文樣本中建立的資料表名為
person
,建表語句如下:CREATE TABLE IF NOT EXISTS person(user_id string, user_name string, age int);
如果您的AnalyticDB MySQL叢集是彈性模式,您需要在集群資訊頁面的網路資訊地區,開啟啟用ENI網絡的開關。
注意事項
本文僅介紹通過Flink SQL建立表並寫入資料至AnalyticDB MySQL版的方法。通過Flink JDBC API寫入資料的方法,請參見JDBC Connector。
本文介紹的方法僅適用於Flink1.11及以上版本。若您需要將其他版本的Flink資料寫入AnalyticDB MySQL版叢集,那麼:
針對Flink1.10和Flink1.9版本,資料寫入方法,請參見Flink 1.10 Documentation: Connect to External Systems。
針對Flink1.8及以下版本,資料寫入方法,請參見Flink 1.8 Documentation: Connect to External Systems。
流程介紹
本文樣本以CSV格式的檔案作為輸入源介紹資料寫入流程。
步驟 | 說明 |
建立一個新的CSV檔案並在檔案中寫入來源資料,然後將新檔案部署至Flink所有節點的/root下。 | |
通過SQL語句在Flink中建立源表和結果表,並通過源表和結果表將資料寫入AnalyticDB MySQL中。 | |
登入AnalyticDB MySQL目標資料庫,來查看並驗證來源資料是否成功匯入。 |
步驟一:資料準備
在其中一個Flink節點的root目錄下,執行
vim /root/data.csv
命令來建立一個名為data.csv的CSV檔案。檔案中包含的資料如下(您可以多複製幾行相同的資料來增加寫入的資料量):
0,json00,20 1,json01,21 2,json02,22 3,json03,23 4,json04,24 5,json05,25 6,json06,26 7,json07,27 8,json08,28 9,json09,29
檔案建立完成後,將其部署至Flink其他節點的/root目錄下。
步驟二:資料寫入
啟動並運行Flink SQL程式。詳細操作步驟,請參見Starting the SQL Client CLI。
建立一張名為
csv_person
的源表,語句如下:CREATE TABLE if not exists csv_person ( `user_id` STRING, `user_name` STRING, `age` INT ) WITH ( 'connector' = 'filesystem', 'path' = 'file:///root/data.csv', 'format' = 'csv', 'csv.ignore-parse-errors' = 'true', 'csv.allow-comments' = 'true' );
說明源表中的列名和資料類型需與AnalyticDB MySQL版中目標表的列名和資料類型保持一致。
建表語句中填寫的
path
是data.csv的本地路徑(Flink各個節點的路徑均需一致)。如果您的data.csv檔案不在本地,請根據實際情況填寫正確的路徑。關於建表語句中的其他參數說明,請參見FileSystem SQL Connector。
建立一張名為
mysql_person
的結果表,語句如下:CREATE TABLE mysql_person ( user_id String, user_name String, age INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true', 'table-name' = '<table_name>', 'username' = '<username>', 'password' = '<password>', 'sink.buffer-flush.max-rows' = '10', 'sink.buffer-flush.interval' = '1s' );
說明結果表中的列名和資料類型需與AnalyticDB MySQL版中目標表的列名和資料類型保持一致。
下表僅列舉了串連AnalyticDB MySQL版叢集時的必填配置項,關於選填配置項的資訊,請參見Connector Options。
必填配置項
說明
connector
指定Flink使用的連接器類型,選擇
jdbc
。url
AnalyticDB MySQL版叢集的JDBC URL。
格式:
jdbc:mysql://<endpoint:port>/<db_name>?useServerPrepStmts=false&rewriteBatchedStatements=true'
,其中:endpoint
:目標AnalyticDB MySQL版叢集的串連地址。說明如果需要使用公網地址串連叢集,您需要先申請公網地址,申請方法,請參見申請/釋放公網地址。
db_name
:AnalyticDB MySQL版中的目標資料庫名。useServerPrepStmts=false&rewriteBatchedStatements=true
:批量寫入資料至AnalyticDB MySQL版的必填配置,用於提高寫入效能,以及降低對AnalyticDB MySQL版叢集的壓力。
樣本:
jdbc:mysql://am-**********.ads.aliyuncs.com:3306/tpch?useServerPrepStmts=false&rewriteBatchedStatements=true
。table-name
AnalyticDB MySQL版中的目標表名,用於儲存寫入的資料。本文樣本中目標表名為
person
。username
AnalyticDB MySQL版中具有寫入許可權的資料庫帳號名。
說明您可以通過SHOW GRANTS查看當前帳號所擁有的許可權。
您可以通過GRANT語句為目標帳號授予許可權。
password
AnalyticDB MySQL版中具有寫入許可權的資料庫帳號密碼。
sink.buffer-flush.max-rows
從Flink寫入資料至AnalyticDB MySQL版時,一次批量寫入的最大行數。Flink會接收即時資料,當接收到的資料行數達到最大寫入行數後,再將資料批量寫入AnalyticDB MySQL版叢集。可選取值如下:
0:最大行數為0時,批量寫入資料功能僅考慮
sink.buffer-flush.interval
配置,即只要滿足最大間隔時間就會開始批量寫入。具體的行數,例如1000、2000等。
說明不建議將該參數設定為0。取值為0不僅會導致寫入效能變差,也會導致AnalyticDB MySQL版叢集執行並發查詢時的壓力變大。
當
sink.buffer-flush.max-rows
和sink.buffer-flush.interval
配置均不為0時,批量寫入功能生效規則如下:若Flink接收到的資料量已達到
sink.buffer-flush.max-rows
所設的值,但最大時間間隔還未到達sink.buffer-flush.interval
所設的值,那麼Flink無需等待間隔期滿,即可直接觸發批量寫入資料至AnalyticDB MySQL版。若Flink接收到的資料量未達到
sink.buffer-flush.max-rows
所設的值,但間隔時間已達到sink.buffer-flush.interval
所設的值,那麼無論Flink接收了多少資料量,都直接觸發批量寫入資料至AnalyticDB MySQL版。
sink.buffer-flush.interval
Flink批量寫入資料至AnalyticDB MySQL版的最大間隔時間,即執行下一次批量寫入資料前的最大等待時間,可選取值如下:
0:時間間隔為0時,批量寫入資料功能僅考慮
sink.buffer-flush.max-rows
配置,即只要Flink接收到的資料行數達到最大寫入行數後就會開始批量寫入。具體的時間間隔,例如1d、1h、1min、1s、1ms等。
說明不建議將該參數設定為0,避免在業務低穀期產生來源資料較少的情境下,影響資料匯入的及時性。
使用
INSERT INTO
語句匯入資料,當主鍵重複時會自動忽略當前寫入資料,資料不做更新,作用等同於INSERT IGNORE INTO
,更多資訊,請參見INSERT INTO。語句如下:INSERT INTO mysql_person SELECT user_id, user_name, age FROM csv_person;
步驟三:資料驗證
匯入完成後,您可以登入AnalyticDB MySQL叢集的目標庫tpch
,執行如下語句查看並驗證來源資料是否成功匯入至目標表person
中:
SELECT * FROM person;