全部產品
Search
文件中心

PolarDB:從PolarDB MySQL版同步到Kafka

更新時間:Jul 06, 2024

Kafka是應用較為廣泛的分布式、高輸送量、高可擴充性訊息佇列服務,普遍用於日誌收集、監控資料彙總、流式資料處理、線上和離線分析等巨量資料領域,是巨量資料生態中不可或缺的產品之一。通過Data Transmission Service,您可以將PolarDB MySQL版同步至自建Kafka叢集,擴充訊息處理能力。

前提條件

  • Kafka叢集的版本為0.10.1.0~2.7.0版本。
  • PolarDB MySQL版已開啟Binlog,詳情請參見如何開啟Binlog

注意事項

如果來源資料庫沒有主鍵或唯一約束,且所有欄位沒有唯一性,可能會導致目標資料庫中出現重複資料。

費用說明

同步類型鏈路配置費用
庫表結構同步和全量資料同步不收費。
增量資料同步收費,詳情請參見計費概述

功能限制

  • 僅支援表粒度的資料同步。
  • 不支援自動調整同步對象。
    說明 如果在同步的過程中,對源庫中待同步的表執行了重新命名操作,且重新命名後的名稱不在同步對象中,那麼該表將不再被同步到目標Kafka叢集中。如果該表還需要同步,那麼您需要新增同步對象

操作步驟

  1. 購買資料同步作業,詳情請參見購買流程
    說明 購買時,選擇源執行個體為PolarDB、目標執行個體為Kafka,並選擇同步拓撲為單向同步
  2. 登入資料轉送控制台

    說明

    若資料轉送控制台自動跳轉至Data Management控制台,您可以在右下角的jiqiren中單擊返回舊版,返回至舊版資料轉送控制台。

  3. 在左側導覽列,單擊資料同步

  4. 同步作業列表頁面頂部,選擇同步的目標執行個體所屬地區。

  5. 定位至已購買的資料同步執行個體,單擊配置同步鏈路

  6. 配置源執行個體及目標執行個體資訊。
    同步通道的源和目標執行個體配置
    類別配置說明
    同步作業名稱DTS會自動產生一個同步作業名稱,建議配置具有業務意義的名稱(無唯一性要求),便於後續識別。
    源執行個體資訊執行個體類型固定為PolarDB執行個體,不可變更。
    執行個體地區購買資料同步執行個體時選擇的源執行個體地區,不可變更。
    PolarDB執行個體ID選擇PolarDB MySQL版叢集ID。
    資料庫帳號填入PolarDB MySQL版叢集的資料庫帳號,需要具備待同步資料庫的讀許可權。
    資料庫密碼填入該資料庫帳號的密碼。
    目標執行個體資訊執行個體類型根據Kafka叢集的部署位置選擇,本文以ECS上的自建資料庫為例介紹配置流程。
    說明 當選擇為其他執行個體類型時,您還需要執行相應的準備工作,詳情請參見準備工作概覽
    執行個體地區購買資料同步執行個體時選擇的目標執行個體地區,不可變更。
    ECS執行個體ID選擇部署了Kafka叢集的ECS執行個體ID。
    資料庫類型選擇為Kafka
    連接埠Kafka叢集對外提供服務的連接埠,預設為9092。
    資料庫帳號填入Kafka叢集的使用者名稱,如Kafka叢集未開啟驗證可不填寫。
    資料庫密碼填入Kafka叢集使用者名稱對應的密碼,如Kafka叢集未開啟驗證可不填寫。
    Topic單擊右側的擷取Topic列表,然後在下拉框中選擇具體的Topic。
    Kafka版本根據目標Kafka叢集版本,選擇對應的版本資訊。
    串連方式根據業務及安全需求,選擇非加密串連SCRAM-SHA-256
  7. 單擊頁面右下角的授權白名單並進入下一步

    如果源或目標資料庫是阿里雲資料庫執行個體(例如RDS MySQLApsaraDB for MongoDB等),DTS會自動將對應地區DTS服務的IP地址添加到阿里雲資料庫執行個體的白名單中;如果源或目標資料庫是ECS上的自建資料庫,DTS會自動將對應地區DTS服務的IP地址添到ECS的安全規則中,您還需確保自建資料庫沒有限制ECS的訪問(若資料庫是叢集部署在多個ECS執行個體,您需要手動將DTS服務對應地區的IP地址添到其餘每個ECS的安全規則中);如果源或目標資料庫是IDC自建資料庫或其他雲資料庫,則需要您手動添加對應地區DTS服務的IP地址,以允許來自DTS伺服器的訪問。DTS服務的IP地址,請參見DTS伺服器的IP位址區段

    警告

    DTS自動添加或您手動添加DTS服務的公網IP位址區段可能會存在安全風險,一旦使用本產品代表您已理解和確認其中可能存在的安全風險,並且需要您做好基本的安全防護,包括但不限於加強帳號密碼強度防範、限制各網段開放的連接埠號碼、內部各API使用鑒權方式通訊、定期檢查並限制不需要的網段,或者使用通過內網(專線/VPN網關/智能網關)的方式接入。

  8. 配置同步對象資訊。
    配置同步對象
    配置說明
    投遞到kafka的資料格式同步到Kafka叢集中的資料以avro格式或者Canal Json格式儲存,定義詳情請參見Kafka叢集的資料存放區格式
    同步到Kafka Partition策略根據業務需求選擇同步的策略,詳細介紹請參見Kafka Partition同步策略說明
    同步對象源庫對象地區框中,選擇需要同步的對象(選擇的粒度為表),然後單擊向右箭頭表徵圖將其移動到已選對象地區框中。
    說明 DTS會自動將表名映射為步驟6選擇的Topic名稱。如果需要更換同步的目標Topic,您需要使用庫表列名映射功能,詳情請參見設定同步對象在目標執行個體中的名稱
    映射名稱更改

    如需更改同步對象在目標執行個體中的名稱,請使用對象名映射功能,詳情請參見庫表列映射

    源、目標庫無法串連重試時間

    當源、目標庫無法串連時,DTS預設重試720分鐘(即12小時),您也可以自訂重試時間。如果DTS在設定的時間內重新串連上源、目標庫,同步任務將自動回復。否則,同步任務將失敗。

    說明

    由於串連重試期間,DTS將收取任務運行費用,建議您根據業務需要自訂重試時間,或者在源和目標庫執行個體釋放後儘快釋放DTS執行個體。

  9. 上述配置完成後單擊頁面右下角的下一步
  10. 配置同步初始化的進階配置資訊。
    Kafka同步初始化進階配置
    配置說明
    同步初始化預設選擇結構初始化全量資料初始化,DTS會在增量資料同步之前,將源庫中待同步對象的結構和存量資料,同步到目標庫。
    過濾選項預設選擇忽略增量同步處理階段的 DDL,即增量同步處理階段源庫執行的DDL操作不會被DTS同步至目標庫。
  11. 上述配置完成後,單擊頁面右下角的預檢查並啟動

    說明
    • 在同步作業正式啟動之前,會先進行預檢查。只有預檢查通過後,才能成功啟動同步作業。

    • 如果預檢查失敗,單擊具體檢查項後的提示,查看失敗詳情。

      • 您可以根據提示修複後重新進行預檢查。

      • 如無需修複警示檢測項,您也可以選擇確認屏蔽忽略警示項並重新進行預檢查,跳過警示檢測項重新進行預檢查。

  12. 預檢查對話方塊中顯示預檢查通過後,關閉預檢查對話方塊,資料同步作業正式開始。
    您可以在資料同步頁面,查看資料同步狀態。查看資料同步狀態