全部產品
Search
文件中心

AnalyticDB:Flink訂閱Binlog

更新時間:Dec 03, 2024

Realtime Compute for Apache Flink通過訂閱AnalyticDB for MySQL,可以即時捕獲和處理資料庫變更資料,實現高效的資料同步和流式計算。本文為您介紹如何使用Flink訂閱AnalyticDB for MySQL Binlog。

前提條件

  • AnalyticDB for MySQL產品系列為湖倉版數倉版彈性模式

  • AnalyticDB for MySQL叢集的核心版本需為3.2.1.0及以上版本。

    說明
    • 查看湖倉版叢集的核心版本,請執行SELECT adb_version();。如需升級核心版本,請聯絡支援人員。

    • 查看和升級數倉版叢集的核心版本,請參見查看和升級版本

  • FlinkRealtime Compute引擎需為VVR 8.0.4及以上版本。

  • AnalyticDB for MySQL和Flink全託管工作空間需要位於同一VPC下,詳情請參見建立叢集開通Realtime ComputeFlink版

  • 已將Flink工作空間所屬的網段加入AnalyticDB for MySQL的白名單,詳情請參見Flink所屬網段查看方法設定白名單

使用限制

  • AnalyticDB for MySQL僅支援按表開啟Binlog功能。

  • Flink僅支援處理AnalyticDB for MySQL Binlog中的所有基礎資料類型和複雜資料類型JSON,詳情請參見資料類型

  • Flink不會處理AnalyticDB for MySQL Binlog中的DDL操作記錄和分區表自動分區刪除的操作記錄。

步驟一:開啟Binlog功能

  1. 開啟Binlog功能,本文以表名為source_table為例。

    建表時,開啟Binlog

    CREATE TABLE source_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )DISTRIBUTED BY HASH (id) BINLOG=true;

    建表後,開啟Binlog

    ALTER TABLE source_table BINLOG=true;
  2. (可選)查看Binlog資訊。

    說明

    使用以下語句查看Binlog檔案的資訊時,若僅開啟Binlog功能,日誌資訊顯示為0。只有Flink成功訂閱Binlog後,才會顯示日誌資訊。

    • 查看當前寫入的Binlog的位點,SQL語句如下:

      SHOW MASTER STATUS FOR source_table;
    • 查看叢集內對應表所有Binlog檔案的資訊,SQL語句如下:

      SHOW BINARY LOGS FOR source_table;
  3. (可選)修改Binlog保留時間長度。

    您可以通過修改binlog_ttl參數來調整Binlog的保留時間長度。以下樣本表示將表source_table的Binlog保留時間長度設定為1天。

    ALTER TABLE source_table binlog_ttl='1d';

    binlog_ttl參數取值支援以下格式:

    • 純數字,表示毫秒。例如,60代表60毫秒。

    • 數字+s,表示秒。例如,30s代表30秒。

    • 數字+h,表示小時。例如,2h代表2小時。

    • 數字+d,表示天。例如,1d代表1天。

步驟二:配置Flink連接器

  1. 登入Realtime Compute控制台

  2. Flink全託管頁簽,單擊目標工作空間操作列下的控制台

  3. 在左側導覽列,單擊資料連線

  4. 資料連線頁面,單擊建立自訂連接器

  5. 上傳自訂連接器JAR包。下載連結:AnalyticDB for MySQL Connector

  6. 上傳完成後,單擊下一步

  7. 單擊完成。建立完成的自訂連接器會出現在連接器列表中。

步驟三:訂閱Binlog

  1. 登入Realtime Compute控制台,建立SQL作業。詳情請參見建立作業

  2. 建立源表,串連到AnalyticDB for MySQL並讀取指定表(source_table)的Binlog資料。

    說明
    • Flink DDL中定義的主鍵必須和AnalyticDB for MySQL叢集物理表中的主鍵保持一致,主鍵一致包括主鍵和主鍵名稱一致。如果不一致,會影響資料正確性。

    • Flink的資料類型需要和AnalyticDB for MySQL相容。映射關係,請參見類型映射

    CREATE TEMPORARY TABLE adb_source (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb-mysql-cdc',
      'hostname' = 'amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com',
      'username' = 'testUser',
      'password' = 'Test12****',
      'database-name' = 'binlog',
      'table-name' = 'source_table'
    );

    WITH參數說明:

    參數

    是否必填

    預設值

    資料類型

    說明

    connector

    STRING

    使用的連接器。

    這裡填寫自訂連接器,固定填寫adb-mysql-cdc

    hostname

    STRING

    AnalyticDB for MySQL的VPC地址。

    username

    STRING

    AnalyticDB for MySQL資料庫帳號。

    password

    STRING

    AnalyticDB for MySQL資料庫密碼。

    database-name

    STRING

    AnalyticDB for MySQL資料庫名稱。

    由於AnalyticDB for MySQL實現的是表級Binlog,此處僅支援設定一個資料庫。

    table-name

    STRING

    AnalyticDB for MySQL資料庫的表名。

    由於AnalyticDB for MySQL實現的是表級Binlog,此處僅支援設定一個表。

    port

    3306

    INTEGER

    連接埠號碼。

    scan.incremental.snapshot.enabled

    true

    BOOLEAN

    增量快照。

    預設開啟。增量快照是一種讀取錶快照的新機制,與舊的快照機制相比,增量快照有許多優點,包括:

    • 在讀取快照期間,Source支援並發讀取。

    • 在讀取快照期間,Source支援進行Chunk粒度的Checkpoint。

    • 在讀取快照之前,Source不需要擷取資料庫鎖許可權。

    scan.incremental.snapshot.chunk.size

    8096

    INTEGER

    錶快照的Chunk大小(包含的行數)。

    當開啟增量快照讀取時,表會被切分成多個Chunk讀取。

    scan.snapshot.fetch.size

    1024

    INTEGER

    讀取錶快照時,每次讀取資料的最大行數。

    scan.startup.mode

    initial

    STRING

    消費資料的啟動模式。

    取值如下:

    • initial(預設):在第一次啟動時,會先掃描歷史全量資料,然後讀取最新的Binlog資料。

    • earliest-offset:不掃描歷史全量資料,直接從可讀取的最早Binlog開始讀取。

    • specific-offset:不掃描歷史全量資料,從您指定的Binlog位點啟動,位點可通過同時配置scan.startup.specific-offset.filescan.startup.specific-offset.pos參數來指定從特定Binlog檔案名稱和位移量啟動。

    scan.startup.specific-offset.file

    STRING

    在specific-offset啟動模式下,啟動位點的Binlog檔案名稱。

    最新Binlog檔案名稱可使用SHOW MASTER STATUES for table_name擷取。

    scan.startup.specific-offset.pos

    LONG

    在specific-offset啟動模式下,啟動位點的Binlog檔案位置。

    最新Binlog位置可使用SHOW MASTER STATUES for table_name擷取。

    scan.startup.specific-offset.skip-events

    LONG

    在指定的啟動位點後需要跳過的事件數目量。

    scan.startup.specific-offset.skip-rows

    LONG

    在指定的啟動位點後需要跳過的資料行數。

    server-time-zone

    STRING

    資料庫伺服器中的會話時區。

    例如:"Asia/Shanghai"。它控制AnalyticDB for MySQL中的TIMESTAMP類型如何轉成STRING類型。如果沒有設定,則使用ZONELD.SYSTEMDEFAULT()來確定伺服器時區。

    debezium.min.row.count.to.stream.result

    1000

    INTEGER

    當表的行數大於該值時,連接器會對結果進行串流。

    若將此參數設定為0,會跳過所有表大小檢查,始終在快照期間對所有結果進行串流。

    connect.timeout

    30s

    DURATION

    串連資料庫伺服器逾時,重試串連之前等待逾時的最長時間。

    預設單位為秒(s)。

    connect.max-retries

    3

    INTEGER

    串連資料庫服務時,串連失敗後重試的最大次數。

  3. 在目標端建立表,用於儲存處理後的資料。本文以AnalyticDB for MySQL作為目標端。Flink支援的連接器請參見支援的連接器

    CREATE TABLE target_table (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`)
    )
  4. 建立結果表,串連步驟3建立的表,用於將處理後的資料寫入到AnalyticDB for MySQL指定的表。

    CREATE TEMPORARY TABLE adb_sink (
      `id` INT,
      `num` BIGINT,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'adb3.0',
      'url' = 'jdbc:mysql://amv-2zepb9n1l58ct01z50000****.ads.aliyuncs.com:3306/flinktest',
      'userName' = 'testUser',
      'password' = 'Test12****',
      'tableName' = 'target_table'
    );

    結果表對應的WITH參數和映射類型,詳情請見:雲原生資料倉儲AnalyticDB MySQL版(ADB)3.0

  5. 將捕獲到的來源資料變化同步到結果表,並由結果表將資料同步到目標端。

    INSERT INTO adb_sink
    SELECT * FROM adb_source;
  6. 單擊儲存

  7. 單擊深度檢查

    深度檢查能夠檢查作業的SQL語義、網路連通性以及作業使用的表的中繼資料資訊。同時,您可以單擊結果地區的SQL最佳化,展開查看SQL風險問題提示以及對應的SQL最佳化建議。

  8. (可選)單擊調試

    您可以使用作業調試功能類比作業運行、檢查輸出結果,驗證SELECT或INSERT商務邏輯的正確性,提升開發效率,降低資料品質風險。詳情請參見作業調試

  9. 單擊部署詳情請參見部署SQL作業

    完成作業開發和深度檢查後,即可部署作業,將資料發布至生產環境。部署後,您可以在作業營運頁面啟動作業至運行階段,詳情請參見作業啟動

類型映射

AnalyticDB for MySQLFlink的資料類型映射關係如下:

AnalyticDB for MySQL欄位類型

Flink欄位類型

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL(p,s)或NUMERIC(p,s)

DECIMAL(p,s)

VARCHAR

STRING

BINARY

BYTES

DATE

DATE

TIME

TIME

DATETIME

TIMESTAMP

TIMESTAMP

TIMESTAMP

POINT

STRING

JSON

STRING