全部產品
Search
文件中心

Realtime Compute for Apache Flink:MySQL整庫同步Kafka

更新時間:Jul 13, 2024

本文為您介紹如何將MySQL整庫同步Kafka,從而降低多個任務對MySQL資料庫造成的壓力。

背景資訊

MySQL CDC資料表主要用於擷取MySQL資料,並可以即時同步資料表中的修改,經常用在複雜的計算情境。例如,作為一張維表和其他資料表做Join操作。在使用中,同一張MySQL表可能被多個作業依賴,當多個任務使用同一張MySQL表做處理時,MySQL資料庫會啟動多個串連,對MySQL伺服器和網路造成很大的壓力。

為了緩解對上遊MySQL資料庫的壓力,阿里雲FlinkRealtime Compute已提供MySQL整庫同步到Kafka的能力,通過引入Kafka作為中介層,並使用CDAS整庫同步或CTAS整表同步到Kafka來解決。

具體操作是使用CDAS或CTAS文法,在一個作業裡將上遊的MySQL的資料即時同步到Kafka中。在MySQL整庫同步任務啟動後,由Kafka JSON Catalog建立Topic,每張MySQL表以Upsert Kafka 的方式寫入對應topic。然後直接使用Kafka JSON Catalog中的表代替MySQL表,從而降低多個任務對MySQL資料庫造成的壓力。

mysql2kafka

使用限制

  • 同步的MySQL表必須包含主鍵。

  • 支援使用自建Kafka叢集,EMR的Kafka叢集和ApsaraMQ for Kafka。使用ApsaraMQ for Kafka時,只能通過預設存取點使用。

  • upsert-kafka表暫未支援作為CTAS和CDAS文法的源表,upsert-kafka表只能作為CTAS和CDAS同步的結果表。

  • Kafka叢集的儲存空間必須大於源表資料的儲存空間,否則會因儲存空間不足導致資料丟失。因為整庫同步Kafka建立的topic都是compacted topic,即topic的每個訊息鍵(Key)僅保留最近的一條訊息,但是資料不會到期,compacted topic裡相當於儲存了與源庫的表相同大小的資料。

操作步驟

  1. 註冊MySQL Catalog和Kafka JSON Catalog。

  2. 建立並啟動一個CDAS或CTAS同步任務,將資料庫中的表同步到Kafka中。

    • CDAS同步語句

      CREATE DATABASE IF NOT EXISTS `kafka-catalog`.`kafka`
      AS DATABASE `mysql-catalog`.`database` INCLUDING ALL TABLES;
      說明

      由於Kafka本身沒有資料庫的概念,所以不存在建立資料庫的操作,使用時需要結合IF NOT EXISTS來跳過建庫。

    • CTAS同步語句

      CREATE TABLE `kafka-catalog`.`kafka`.`topic`
      AS TABLE `mysql-catalog`.`db`.`table`;
  3. 使用同步到Kafka的表。

    整庫同步任務建立的Kafka topic名稱和MySQL表名相同,分區數和副本數會使用叢集的預設配置,並且cleanup.policy會設定為compact。

    使用Kafka JSON Catalog訪問MySQL資料庫表的對應表,下遊作業可以消費Topic中的資料來擷取資料庫表的最新資料。對於同步到Kafka的表,使用方式有以下兩種:

    • 通過Catalog直接使用

      詳情請參見使用Kafka JSON Catalog

      說明

      在直接使用時,由於可能發生了Schema變更,Kafka JSON Catalog解析出的Schema可能與MySQL對應表存在差異,例如出現已經刪除的欄位,部分欄位可能出現為null的情況。

      Catalog讀取出的Schema由消費到的資料的欄位組成。如果存在刪除的欄位且訊息未到期,則會出現一些已經不存在的欄位,這樣的欄位值會為null,該情況無需特殊處理。

    • 通過建立暫存資料表的方式使用

      這種方式支援使用者自訂指定Schema。您可以在Schemas tab頁查看並複製表配置的WITH部分。基本格式如下所示。

      CREATE TEMPORARY TABLE tempOrder (
        `key_order_id` BIGINT NOT NULL,
        `value_product` STRING,
        PRIMARY KEY (key_order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'upsert-kafka',
        'topic' = 'order',
        'properties.bootstrap.servers' = 'xxxx',
        'key.format' = 'json',
        'key.fields-prefix' = 'key_',
        'value.format' = 'json',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );

      參數

      說明

      備忘

      connector

      Connector類型。

      固定值為upsert-kafka。

      topic

      對應的Topic名稱。

      和Kafka JSON Catalog的描述保持一致。

      properties.bootstrap.servers

      Kafka Broker地址。

      格式為host:port,host:port,host:port,以英文逗號(,)分割。

      key.format

      Flink Kafka Connector在序列化或還原序列化Kafka的訊息鍵(Key)時使用的格式。

      固定值為json。

      key.fields-prefix

      為所有Kafka訊息鍵(Key)指定自訂首碼,以避免與訊息體(Value)或Metadata欄位重名。

      需要和Kafka JSON Catalog的key.fields-prefix參數值保持一致。

      value.format

      Flink Kafka Connector在序列化或還原序列化Kafka的訊息體(Value)時使用的格式。

      固定值為json。

      value.fields-prefix

      為所有Kafka訊息體(Value)指定自訂首碼,以避免與訊息鍵(Key)或Metadata欄位重名。

      需要和Kafka JSON Catalog的value.fields-prefix參數值保持一致。

      value.fields-include

      定義訊息體在處理訊息鍵欄位時的策略。

      固定值為EXCEPT_KEY。表示訊息體中不包含訊息鍵的欄位。

      value.json.infer-schema.flatten-nested-columns.enable

      Kafka訊息體(Value)是否遞迴式地展開JSON中的嵌套列。

      對應Catalog的infer-schema.flatten-nested-columns.enable參數配置值。

      value.json.infer-schema.primitive-as-string

      Kafka訊息體(Value)是否推導所有基本類型為String類型。

      對應Catalog的infer-schema.primitive-as-string參數配置值。

應用樣本

例如,在訂單評論即時分析情境下,假設有使用者表(user),訂單表(order)和使用者評論表(feedback)三張表。各個表包含資料如下圖所示。mysql database

在展示使用者訂單資訊和使用者評論時,需要通過關聯使用者表(user)來擷取使用者名稱(name欄位)資訊。程式碼範例如下。

-- 將訂單資訊和使用者表做join,展示每個訂單的使用者名稱和商品名。
SELECT order.id as order_id, product, user.name as user_name
FROM order LEFT JOIN user
ON order.user_id = user.id;

-- 將評論和使用者表做join,展示每個評論的內容和對應使用者名稱。
SELECT feedback.id as feedback_id, comment, user.name as user_name
FROM feedback LEFT JOIN user
ON feedback.user_id = user.id;

對於以上兩個SQL任務,user表在兩個作業中都被使用了一次。運行時,兩個作業都會讀取MySQL的全量資料和增量資料。全量讀取需要建立MySQL串連,增量讀取需要建立Binlog Client。隨著作業的不斷增多,MySQL串連和Binlog Client資源也會對應增長,會給上遊資料庫產生極大的壓力。

為了緩解對上遊MySQL資料庫的壓力,可以通過CDAS或CTAS文法在一個作業裡將上遊的MySQL資料即時同步到Kafka中,然後提供給多個下遊作業消費。程式碼範例如下。

CREATE DATABASE IF NOT EXISTS `kafka-catalog`.`kafka`
AS DATABASE `mysql-catalog`.`database` INCLUDING ALL TABLES;

同步任務成功啟動後,上遊MySQL資料庫中的資料會以JSON格式寫入Kafka中,一個Kafka Topic可以提供給多個下遊作業消費,從而避免多個MySQL CDC Source直連資料庫產生壓力。程式碼範例如下。

-- 將訂單資訊和Kafka JSON Catalog中的使用者表做join,展示每個訂單的使用者名稱和商品名。
SELECT order.id as order_id, product, user.value_name as user_name
FROM order LEFT JOIN `kafka-catalog`.`kafka`.`user` as user
ON order.user_id = user.id;

-- 將評論和Kafka JSON Catalog中的使用者表做join,展示每個評論的內容和對應使用者名稱。
SELECT feedback.id as feedback_id, comment, user.value_name as user_name
FROM feedback LEFT JOIN `kafka-catalog`.`kafka`.`user` as user
ON feedback.user_id = user.id;

相關文檔