Realtime Compute for Apache Flink通過訂閱AnalyticDB for MySQL,可以即時捕獲和處理資料庫變更資料,實現高效的資料同步和流式計算。本文為您介紹如何使用Flink訂閱AnalyticDB for MySQL Binlog。
前提條件
AnalyticDB for MySQL產品系列為湖倉版或數倉版彈性模式。
AnalyticDB for MySQL叢集的核心版本需為3.2.1.0及以上版本。
說明查看湖倉版叢集的核心版本,請執行
SELECT adb_version();
。如需升級核心版本,請聯絡支援人員。查看和升級數倉版叢集的核心版本,請參見查看和升級版本。
FlinkRealtime Compute引擎需為VVR 8.0.4及以上版本。
AnalyticDB for MySQL和Flink全託管工作空間需要位於同一VPC下,詳情請參見建立叢集和開通Realtime ComputeFlink版。
已將Flink工作空間所屬的網段加入AnalyticDB for MySQL的白名單,詳情請參見Flink所屬網段查看方法和設定白名單。
使用限制
AnalyticDB for MySQL僅支援按表開啟Binlog功能。
Flink僅支援處理AnalyticDB for MySQL Binlog中的所有基礎資料類型和複雜資料類型JSON,詳情請參見資料類型。
Flink不會處理AnalyticDB for MySQL Binlog中的DDL操作記錄和分區表自動分區刪除的操作記錄。
步驟一:開啟Binlog功能
開啟Binlog功能,本文以表名為source_table為例。
建表時,開啟Binlog
CREATE TABLE source_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )DISTRIBUTED BY HASH (id) BINLOG=true;
建表後,開啟Binlog
ALTER TABLE source_table BINLOG=true;
(可選)查看Binlog資訊。
說明使用以下語句查看Binlog檔案的資訊時,若僅開啟Binlog功能,日誌資訊顯示為0。只有Flink成功訂閱Binlog後,才會顯示日誌資訊。
查看當前寫入的Binlog的位點,SQL語句如下:
SHOW MASTER STATUS FOR source_table;
查看叢集內對應表所有Binlog檔案的資訊,SQL語句如下:
SHOW BINARY LOGS FOR source_table;
(可選)修改Binlog保留時間長度。
您可以通過修改
binlog_ttl
參數來調整Binlog的保留時間長度。以下樣本表示將表source_table的Binlog保留時間長度設定為1天。ALTER TABLE source_table binlog_ttl='1d';
binlog_ttl
參數取值支援以下格式:純數字,表示毫秒。例如,60代表60毫秒。
數字+s,表示秒。例如,30s代表30秒。
數字+h,表示小時。例如,2h代表2小時。
數字+d,表示天。例如,1d代表1天。
步驟二:配置Flink連接器
在Flink全託管頁簽,單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊資料連線。
在資料連線頁面,單擊建立自訂連接器
上傳自訂連接器JAR包。下載連結:AnalyticDB for MySQL Connector。
上傳完成後,單擊下一步
單擊完成。建立完成的自訂連接器會出現在連接器列表中。
步驟三:訂閱Binlog
登入Realtime Compute控制台,建立SQL作業。詳情請參見建立作業。
建立源表,串連到AnalyticDB for MySQL並讀取指定表(source_table)的Binlog資料。
說明Flink DDL中定義的主鍵必須和AnalyticDB for MySQL叢集物理表中的主鍵保持一致,主鍵一致包括主鍵和主鍵名稱一致。如果不一致,會影響資料正確性。
Flink的資料類型需要和AnalyticDB for MySQL相容。映射關係,請參見類型映射。
CREATE TEMPORARY TABLE adb_source ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb-mysql-cdc', 'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com', 'username' = 'testUser', 'password' = 'Test12****', 'database-name' = 'binlog', 'table-name' = 'source_table' );
WITH參數說明:
參數
是否必填
預設值
資料類型
說明
connector
是
無
STRING
使用的連接器。
這裡填寫自訂連接器,固定填寫
adb-mysql-cdc
。hostname
是
無
STRING
AnalyticDB for MySQL的VPC地址。
username
是
無
STRING
AnalyticDB for MySQL資料庫帳號。
password
是
無
STRING
AnalyticDB for MySQL資料庫密碼。
database-name
是
無
STRING
AnalyticDB for MySQL資料庫名稱。
由於AnalyticDB for MySQL實現的是表級Binlog,此處僅支援設定一個資料庫。
table-name
是
無
STRING
AnalyticDB for MySQL資料庫的表名。
由於AnalyticDB for MySQL實現的是表級Binlog,此處僅支援設定一個表。
port
否
3306
INTEGER
連接埠號碼。
scan.incremental.snapshot.enabled
否
true
BOOLEAN
增量快照。
預設開啟。增量快照是一種讀取錶快照的新機制,與舊的快照機制相比,增量快照有許多優點,包括:
在讀取快照期間,Source支援並發讀取。
在讀取快照期間,Source支援進行Chunk粒度的Checkpoint。
在讀取快照之前,Source不需要擷取資料庫鎖許可權。
scan.incremental.snapshot.chunk.size
否
8096
INTEGER
錶快照的Chunk大小(包含的行數)。
當開啟增量快照讀取時,表會被切分成多個Chunk讀取。
scan.snapshot.fetch.size
否
1024
INTEGER
讀取錶快照時,每次讀取資料的最大行數。
scan.startup.mode
否
initial
STRING
消費資料的啟動模式。
取值如下:
initial(預設):在第一次啟動時,會先掃描歷史全量資料,然後讀取最新的Binlog資料。
earliest-offset:不掃描歷史全量資料,直接從可讀取的最早Binlog開始讀取。
specific-offset:不掃描歷史全量資料,從您指定的Binlog位點啟動,位點可通過同時配置
scan.startup.specific-offset.file
和scan.startup.specific-offset.pos
參數來指定從特定Binlog檔案名稱和位移量啟動。
scan.startup.specific-offset.file
否
無
STRING
在specific-offset啟動模式下,啟動位點的Binlog檔案名稱。
最新Binlog檔案名稱可使用
SHOW MASTER STATUES for table_name
擷取。scan.startup.specific-offset.pos
否
無
LONG
在specific-offset啟動模式下,啟動位點的Binlog檔案位置。
最新Binlog位置可使用
SHOW MASTER STATUES for table_name
擷取。scan.startup.specific-offset.skip-events
否
無
LONG
在指定的啟動位點後需要跳過的事件數目量。
scan.startup.specific-offset.skip-rows
否
無
LONG
在指定的啟動位點後需要跳過的資料行數。
server-time-zone
否
無
STRING
資料庫伺服器中的會話時區。
例如:"Asia/Shanghai"。它控制AnalyticDB for MySQL中的TIMESTAMP類型如何轉成STRING類型。如果沒有設定,則使用
ZONELD.SYSTEMDEFAULT()
來確定伺服器時區。debezium.min.row.count.to.stream.result
否
1000
INTEGER
當表的行數大於該值時,連接器會對結果進行串流。
若將此參數設定為
0
,會跳過所有表大小檢查,始終在快照期間對所有結果進行串流。connect.timeout
否
30s
DURATION
串連資料庫伺服器逾時,重試串連之前等待逾時的最長時間。
預設單位為秒(s)。
connect.max-retries
否
3
INTEGER
串連資料庫服務時,串連失敗後重試的最大次數。
在目標端建立表,用於儲存處理後的資料。本文以AnalyticDB for MySQL作為目標端。Flink支援的連接器請參見支援的連接器。
CREATE TABLE target_table ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) )
建立結果表,串連步驟3建立的表,用於將處理後的資料寫入到AnalyticDB for MySQL指定的表。
CREATE TEMPORARY TABLE adb_sink ( `id` INT, `num` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'adb3.0', 'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest', 'userName' = 'testUser', 'password' = 'Test12****', 'tableName' = 'target_table' );
結果表對應的WITH參數和映射類型,詳情請見:雲原生資料倉儲AnalyticDB MySQL版(ADB)3.0。
將捕獲到的來源資料變化同步到結果表,並由結果表將資料同步到目標端。
INSERT INTO adb_sink SELECT * FROM adb_source;
單擊儲存。
單擊深度檢查。
深度檢查能夠檢查作業的SQL語義、網路連通性以及作業使用的表的中繼資料資訊。同時,您可以單擊結果地區的SQL最佳化,展開查看SQL風險問題提示以及對應的SQL最佳化建議。
(可選)單擊調試。
您可以使用作業調試功能類比作業運行、檢查輸出結果,驗證SELECT或INSERT商務邏輯的正確性,提升開發效率,降低資料品質風險。詳情請參見作業調試。
單擊部署,詳情請參見部署SQL作業。
完成作業開發和深度檢查後,即可部署作業,將資料發布至生產環境。部署後,您可以在作業營運頁面啟動作業至運行階段,詳情請參見作業啟動。
類型映射
AnalyticDB for MySQL與Flink的資料類型映射關係如下:
AnalyticDB for MySQL欄位類型 | Flink欄位類型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(p,s)或NUMERIC(p,s) | DECIMAL(p,s) |
VARCHAR | STRING |
BINARY | BYTES |
DATE | DATE |
TIME | TIME |
DATETIME | TIMESTAMP |
TIMESTAMP | TIMESTAMP |
POINT | STRING |
JSON | STRING |