全部產品
Search
文件中心

AnalyticDB:通過資料同步功能APS同步Kafka資料(推薦)

更新時間:Nov 26, 2025

AnalyticDB for MySQL提供資料同步APS(AnalyticDB Pipeline Service)功能,您可以建立Kafka同步鏈路,通過同步鏈路從指定時間位點,即時同步Kafka中的資料入湖,以滿足近即時產出、全量歷史歸檔、彈性分析等需求。本文主要介紹如何添加Kafka資料來源,建立Kafka同步鏈路並啟動任務,以及資料同步後如何進行資料分析和資料來源管理。

前提條件

注意事項

  • 僅支援同步JSON格式的Kafka資料。

  • Kafka中建立的Topic資料超過一定的時間會被自動清理,如果Topic資料到期,同時資料同步任務失敗,重新啟動同步任務時讀取不到被清理掉的資料,會有遺失資料的風險。因此請適當調大Topic資料的生命週期,並在資料同步任務失敗時及時聯絡支援人員。

  • 擷取Kafka範例資料在大於8 KB的情況下,Kafka API會將資料進行截斷,導致解析範例資料為JSON格式時失敗,從而無法自動產生欄位對應資訊。

  • kafka源端表結構發生變化時,不會觸發DDL自動變更,即變更不會同步至AnalyticDB for MySQL

  • 資料入湖後,需執行Commit操作使寫入的資料可見。為避免Commit操作間隔過小影響作業運行穩定性和讀寫效能,AnalyticDB for MySQL資料同步功能Commit操作間隔預設為5分鐘。因此,當您首次建立並啟動資料同步任務時,至少需等待5分鐘,才可以查看第一批寫入的資料。

計費說明

通過AnalyticDB for MySQL資料移轉功能遷移資料至OSS會產生以下費用。

使用流程

建立資料來源

說明

如果您已添加Kafka資料來源,可跳過該步驟,直接建立同步鏈路,詳情請參見建立同步鏈路

  1. 登入雲原生資料倉儲AnalyticDB MySQL控制台,在左上方選擇叢集所在地區。在左側導覽列,單擊集群清單,然後單擊目的地組群ID。

  2. 在左側導覽列,單擊數據接入>數據源管理

  3. 單擊左上方新建數據源

  4. 新建數據源頁面進行參數配置。參數說明如下表所示:

    參數名稱

    參數說明

    數據源類型

    選擇資料來源類型Kafka

    數據源名稱

    系統預設按資料來源類型與目前時間產生名稱,可按需修改。

    數據源描述

    資料來源備忘描述,例如湖倉應用情境、應用業務限制等。

    部署模式

    目前僅支援阿里雲執行個體。

    Kafka實例

    Kafka執行個體ID。

    登入雲訊息佇列 Kafka 版控制台,在实例列表頁面查看執行個體ID。

    Kafka Topic

    在Kafka中建立的Topic名稱。

    登入雲訊息佇列 Kafka 版控制台,在目標執行個體的Topic管理頁面查看Topic名稱。

    消息數據格式

    Kafka訊息資料格式,目前僅支援JSON。

  5. 參數配置完成後,單擊創建

建立同步鏈路

  1. 在左側導覽列,單擊SLS/Kafka數據同步

  2. 在左上方,單擊新建同步鏈路

  3. 新建同步鏈路頁面,進行資料來源的數據源及目標端配置目標庫表配置同步配置

    • 數據源及目標端配置的參數說明如下:

      參數名稱

      參數說明

      數據鏈路名稱

      資料鏈路名稱。系統預設按資料來源類型與目前時間產生名稱,可按需修改。

      數據源

      選擇已有的Kafka資料來源,也可建立資料來源。

      目標端類型

      支援如下選項:

      • 數據湖-用戶OSS

      • 數據湖-ADB湖存儲(推薦)。

        重要

        選擇數據湖-ADB湖存儲時,需開啟湖儲存功能

      ADB湖儲存

      AnalyticDB for MySQL湖資料所在湖儲存名稱。

      在下拉式清單中選擇目標湖儲存,若無已建立的湖儲存,可單擊下拉式清單中的自動創建,自動建立湖儲存。

      重要

      目標端類型選擇數據湖-ADB湖存儲時,填寫該參數。

      OSS路徑

      AnalyticDB for MySQL湖資料在OSS中的儲存路徑。

      重要
      • 目標端類型選擇數據湖-用戶OSS 時,填寫該參數。

      • 展示的Bucket是與AnalyticDB for MySQL叢集同地區的所有Bucket,您可以任意選擇其中一個。請謹慎規劃儲存路徑,建立後不允許修改。

      • 建議選擇一個空目錄,且不能與其他任務的OSS路徑有相互首碼關係,防止資料覆蓋。例如,兩個資料同步任務的OSS路徑分別為oss://testBucketName/test/sls1/oss://testBucketName/test/,OSS路徑有相互首碼關係,資料同步過程中會有資料覆蓋。

      儲存格式

      資料存放區格式。支援如下選項:

      • PAIMON。

        重要

        目標端類型數據湖-用戶OSS 時,支援該格式。

      • ICEBERG。

    • 目標庫表配置參數說明如下:

      參數名稱

      參數說明

      庫名

      同步到AnalyticDB for MySQL的資料庫名稱。如果不存在同名資料庫,將建立庫;如果已存在同名資料庫,資料會同步到已存在的資料庫中。庫名命名規則,詳見使用限制

      重要

      數據源及目標端配置中,若儲存格式 PAIMON,已有資料庫需滿足以下條件,否則資料同步任務會失敗:

      • 必須是外部資料庫,即建庫語句必須為CREATE EXTERNAL DATABASE<資料庫名>

      • 建庫語句DBPROPERTIES參數中必須有catalog屬性,且catalog值必須為paimon

      • 建庫語句DBPROPERTIES參數必須有adb.paimon.warehouse屬性。例如:adb.paimon.warehouse=oss://testBucketName/aps/data

      • 建庫語句DBPROPERTIES參數必須有LOCATION屬性,且必須在資料庫名稱後面加.db,否則XIHE查詢會失敗。例如:LOCATION=oss://testBucketName/aps/data/kafka_paimon_external_db.db/

        LOCATION配置的OSS路徑,Bucket目錄必須真實存在,否則建庫會失敗。

      錶名

      同步到AnalyticDB for MySQL的表名稱。如果庫中不存在同名表,將建立表;如果庫中已存在同名表,資料同步會失敗。表名命名規則,詳見使用限制

      樣例數據

      自動從Kafka Topic中擷取的最新資料作為範例資料。

      說明

      Kafka Topic中的資料需為JSON格式,若存在其他格式的資料,資料同步時會報錯。

      JSON解析層級

      設定JSON的嵌套解析層數,取值說明:

      • 0:不做解析。

      • 1(預設值):解析一層。

      • 2:解析兩層。

      • 3:解析三層。

      • 4:解析四層。

      JSON的嵌套解析策略,請參見JSON解析層級和Schema欄位推斷樣本

      Schema字段映射

      展示範例資料經過JSON解析後的Schema資訊。可在此調整目標欄位名,類型或按需增刪欄位等。

      分區鍵設置

      為目標表設定分區鍵。建議按日誌時間或者商務邏輯配置分區,以保證入湖與查詢效能。如不設定,則目標表預設沒有分區。

      目標端分區鍵的格式處理方法分為:時間格式化和指定分區欄位。

      • 按日期時間分區,分區欄位名請選擇一個日期時間欄位。格式處理方法選擇時間格式化,選擇源端欄位格式和目標資料分割格式。AnalyticDB for MySQL會按源端欄位格式識別分區欄位的值,並將其轉換為目標資料分割格式進行分區。例如,源欄位為gmt_created,值為1711358834,源端欄位格式為秒級精度時間戳記,目標資料分割格式為yyyyMMdd,則會按20240325進行分區。

      • 按欄位值分區,格式處理方法請選擇指定分區欄位。

    • 同步配置的參數說明如下:

      參數名稱

      參數說明

      增量同步起始消費位點

      同步任務啟動時會從選擇的時間點開始消費Kafka資料。取值說明:

      • 最早位點(begin_cursor):自動從Kafka資料中最開始的時間點消費資料。

      • 最近位點(end_cursor):自動從Kafka資料中最近的時間點消費資料。

      • 自訂點位:您可以選擇任意一個時間點,系統則會從Kafka中第一條大於等於該時間點的資料開始消費。

      Job型資源組

      指定任務啟動並執行Job型資源群組。

      增量同步所需ACU數

      指定任務啟動並執行Job型資源群組ACU數。最小ACU數為2,最大ACU數為Job型資源群組可用計算最大資源數。建議多指定一些ACU數,可以提升入湖效能及任務穩定性。

      說明

      建立資料同步任務時,使用Job型資源群組中的彈性資源。資料同步任務會長期佔用資源,因此系統會從資源群組中扣除該任務佔用的資源。例如,Job型資源群組的計算最大資源為48 ACU,已建立了一個8 ACU的同步任務,在該資源群組中建立另一個同步任務時,可選的最大ACU數為40。

      高級配置

      進階配置可以讓您對同步任務進行個人化的配置。如需進行個人化配置,請聯絡支援人員。

  4. 上述參數配置完成後,單擊提交

啟動資料同步任務

  1. SLS/Kafka數據同步頁面,選擇建立成功的資料同步任務,在操作列單擊啟動

  2. 單擊左上方査詢,狀態變為正在運行即資料同步任務啟動成功。

資料分析

同步任務成功後,您可以通過Spark Jar開發對同步到AnalyticDB for MySQL的資料進行分析。Spark開發的相關操作,請參見Spark開發編輯器Spark離線應用開發

  1. 在左側導覽列,單擊作業開發 > Spark Jar 開發

  2. 在預設範本中輸入樣本語句,並單擊運行

    -- Here is just an example of SparkSQL. Modify the content and run your spark program.
    
    conf spark.driver.resourceSpec=medium;
    conf spark.executor.instances=2;
    conf spark.executor.resourceSpec=medium;
    conf spark.app.name=Spark SQL Test;
    conf spark.adb.connectors=oss;
    
    -- Here are your sql statements
    show tables from lakehouse20220413156_adbTest;
  3. 可選:應用列表頁簽中,單擊操作列的日誌,查看Spark SQL啟動並執行日誌。

管理資料來源

在左側導覽列,單擊數據接入>數據源管理,您可以在操作列執行以下操作。

操作按鈕

說明

新建鏈路

快捷跳轉到建立此資料來源下的資料同步或資料移轉任務。

查看

查看資料來源的詳細配置。

編輯

編輯資料來源屬性,如更新資料來源名稱、描述等。

刪除

刪除當前資料來源。

說明

當資料來源下存在資料同步或資料移轉任務時,此資料來源無法直接刪除,需先在SLS/Kafka數據同步頁面,單擊目標同步任務操作列的删除,刪除資料同步或資料移轉任務。

JSON解析層級和Schema欄位推斷樣本

解析層級指按相應層數解析出JSON中的欄位。如果使用者向Kafka發送的JSON資料如下。

{
  "name" : "zhangle",
  "age" : 18,
  "device" : {
    "os" : {
        "test":lag,
        "member":{
             "fa":zhangsan,
             "mo":limei
       }
     },
    "brand" : "none",
    "version" : "11.4.2"
  }
}

JSON資料解析後,對應0~4層的效果如下。

0層解析

不做任何解析,直接輸出原始JSON資料。

JSON欄位

目標欄位名

__value__

{ "name" : "zhangle","age" : 18, "device" : { "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" }}

__value__

1層解析

解析JSON的第一層欄位。

JSON欄位

目標欄位名

name

zhangle

name

age

18

age

device

{ "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" }

device

2層解析

解析JSON的第二層欄位。如果欄位沒有嵌套則直接輸出,例如name和age欄位直接輸出。如果欄位中有嵌套,則輸出其子層級欄位,例如device欄位有嵌套,因此輸出其子層級device.osdevice.branddevice.version

重要

由於目標欄位名不支援“.”,因此會自動替換為“_”。

JSON欄位

目標欄位名

name

zhangle

name

age

18

age

device.os

{ "test":lag,"member":{ "fa":zhangsan,"mo":limei }}

device_os

device.brand

none

device_brand

device.version

11.4.2

device_version

3層解析

JSON欄位

目標欄位名

name

zhangle

name

age

18

age

device.os.test

lag

device_os_test

device.os.member

{ "fa":zhangsan,"mo":limei }

device_os_member

device.brand

none

device_brand

device.version

11.4.2

device_version

4層解析

JSON欄位

目標欄位名

name

zhangle

name

age

18

age

device.os.test

lag

device_os_test

device.os.member.fa

zhangsan

device_os_member_fa

device.os.member.mo

lime

device_os_member_mo

device.brand

none

device_brand

device.version

11.4.2

device_version