本文為您介紹如何將MySQL整庫同步Kafka,從而降低多個任務對MySQL資料庫造成的壓力。
背景資訊
MySQL CDC資料表主要用於擷取MySQL資料,並可以即時同步資料表中的修改,經常用在複雜的計算情境。例如,作為一張維表和其他資料表做Join操作。在使用中,同一張MySQL表可能被多個作業依賴,當多個任務使用同一張MySQL表做處理時,MySQL資料庫會啟動多個串連,對MySQL伺服器和網路造成很大的壓力。
為了緩解對上遊MySQL資料庫的壓力,阿里雲FlinkRealtime Compute已提供MySQL整庫同步到Kafka的能力,通過引入Kafka作為中介層,並使用CDAS整庫同步或CTAS整表同步到Kafka來解決。
具體操作是使用CDAS或CTAS文法,在一個作業裡將上遊的MySQL的資料即時同步到Kafka中。在MySQL整庫同步任務啟動後,由Kafka JSON Catalog建立Topic,每張MySQL表以Upsert Kafka 的方式寫入對應topic。然後直接使用Kafka JSON Catalog中的表代替MySQL表,從而降低多個任務對MySQL資料庫造成的壓力。
使用限制
同步的MySQL表必須包含主鍵。
支援使用自建Kafka叢集,EMR的Kafka叢集和ApsaraMQ for Kafka。使用ApsaraMQ for Kafka時,只能通過預設存取點使用。
upsert-kafka表暫未支援作為CTAS和CDAS文法的源表,upsert-kafka表只能作為CTAS和CDAS同步的結果表。
Kafka叢集的儲存空間必須大於源表資料的儲存空間,否則會因儲存空間不足導致資料丟失。因為整庫同步Kafka建立的topic都是compacted topic,即topic的每個訊息鍵(Key)僅保留最近的一條訊息,但是資料不會到期,compacted topic裡相當於儲存了與源庫的表相同大小的資料。
操作步驟
註冊MySQL Catalog和Kafka JSON Catalog。
建立並啟動一個CDAS或CTAS同步任務,將資料庫中的表同步到Kafka中。
CDAS同步語句
CREATE DATABASE IF NOT EXISTS `kafka-catalog`.`kafka` AS DATABASE `mysql-catalog`.`database` INCLUDING ALL TABLES;
說明由於Kafka本身沒有資料庫的概念,所以不存在建立資料庫的操作,使用時需要結合IF NOT EXISTS來跳過建庫。
CTAS同步語句
CREATE TABLE `kafka-catalog`.`kafka`.`topic` AS TABLE `mysql-catalog`.`db`.`table`;
使用同步到Kafka的表。
整庫同步任務建立的Kafka topic名稱和MySQL表名相同,分區數和副本數會使用叢集的預設配置,並且cleanup.policy會設定為compact。
使用Kafka JSON Catalog訪問MySQL資料庫表的對應表,下遊作業可以消費Topic中的資料來擷取資料庫表的最新資料。對於同步到Kafka的表,使用方式有以下兩種:
通過Catalog直接使用
詳情請參見使用Kafka JSON Catalog。
說明在直接使用時,由於可能發生了Schema變更,Kafka JSON Catalog解析出的Schema可能與MySQL對應表存在差異,例如出現已經刪除的欄位,部分欄位可能出現為null的情況。
Catalog讀取出的Schema由消費到的資料的欄位組成。如果存在刪除的欄位且訊息未到期,則會出現一些已經不存在的欄位,這樣的欄位值會為null,該情況無需特殊處理。
通過建立暫存資料表的方式使用
這種方式支援使用者自訂指定Schema。您可以在Schemas tab頁查看並複製表配置的WITH部分。基本格式如下所示。
CREATE TEMPORARY TABLE tempOrder ( `key_order_id` BIGINT NOT NULL, `value_product` STRING, PRIMARY KEY (key_order_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'order', 'properties.bootstrap.servers' = 'xxxx', 'key.format' = 'json', 'key.fields-prefix' = 'key_', 'value.format' = 'json', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' );
參數
說明
備忘
connector
Connector類型。
固定值為upsert-kafka。
topic
對應的Topic名稱。
和Kafka JSON Catalog的描述保持一致。
properties.bootstrap.servers
Kafka Broker地址。
格式為
host:port,host:port,host:port
,以英文逗號(,)分割。key.format
Flink Kafka Connector在序列化或還原序列化Kafka的訊息鍵(Key)時使用的格式。
固定值為json。
key.fields-prefix
為所有Kafka訊息鍵(Key)指定自訂首碼,以避免與訊息體(Value)或Metadata欄位重名。
需要和Kafka JSON Catalog的key.fields-prefix參數值保持一致。
value.format
Flink Kafka Connector在序列化或還原序列化Kafka的訊息體(Value)時使用的格式。
固定值為json。
value.fields-prefix
為所有Kafka訊息體(Value)指定自訂首碼,以避免與訊息鍵(Key)或Metadata欄位重名。
需要和Kafka JSON Catalog的value.fields-prefix參數值保持一致。
value.fields-include
定義訊息體在處理訊息鍵欄位時的策略。
固定值為EXCEPT_KEY。表示訊息體中不包含訊息鍵的欄位。
value.json.infer-schema.flatten-nested-columns.enable
Kafka訊息體(Value)是否遞迴式地展開JSON中的嵌套列。
對應Catalog的infer-schema.flatten-nested-columns.enable參數配置值。
value.json.infer-schema.primitive-as-string
Kafka訊息體(Value)是否推導所有基本類型為String類型。
對應Catalog的infer-schema.primitive-as-string參數配置值。
應用樣本
例如,在訂單評論即時分析情境下,假設有使用者表(user),訂單表(order)和使用者評論表(feedback)三張表。各個表包含資料如下圖所示。
在展示使用者訂單資訊和使用者評論時,需要通過關聯使用者表(user)來擷取使用者名稱(name欄位)資訊。程式碼範例如下。
-- 將訂單資訊和使用者表做join,展示每個訂單的使用者名稱和商品名。
SELECT order.id as order_id, product, user.name as user_name
FROM order LEFT JOIN user
ON order.user_id = user.id;
-- 將評論和使用者表做join,展示每個評論的內容和對應使用者名稱。
SELECT feedback.id as feedback_id, comment, user.name as user_name
FROM feedback LEFT JOIN user
ON feedback.user_id = user.id;
對於以上兩個SQL任務,user表在兩個作業中都被使用了一次。運行時,兩個作業都會讀取MySQL的全量資料和增量資料。全量讀取需要建立MySQL串連,增量讀取需要建立Binlog Client。隨著作業的不斷增多,MySQL串連和Binlog Client資源也會對應增長,會給上遊資料庫產生極大的壓力。
為了緩解對上遊MySQL資料庫的壓力,可以通過CDAS或CTAS文法在一個作業裡將上遊的MySQL資料即時同步到Kafka中,然後提供給多個下遊作業消費。程式碼範例如下。
CREATE DATABASE IF NOT EXISTS `kafka-catalog`.`kafka`
AS DATABASE `mysql-catalog`.`database` INCLUDING ALL TABLES;
同步任務成功啟動後,上遊MySQL資料庫中的資料會以JSON格式寫入Kafka中,一個Kafka Topic可以提供給多個下遊作業消費,從而避免多個MySQL CDC Source直連資料庫產生壓力。程式碼範例如下。
-- 將訂單資訊和Kafka JSON Catalog中的使用者表做join,展示每個訂單的使用者名稱和商品名。
SELECT order.id as order_id, product, user.value_name as user_name
FROM order LEFT JOIN `kafka-catalog`.`kafka`.`user` as user
ON order.user_id = user.id;
-- 將評論和Kafka JSON Catalog中的使用者表做join,展示每個評論的內容和對應使用者名稱。
SELECT feedback.id as feedback_id, comment, user.value_name as user_name
FROM feedback LEFT JOIN `kafka-catalog`.`kafka`.`user` as user
ON feedback.user_id = user.id;