全部產品
Search
文件中心

Realtime Compute for Apache Flink:資料庫即時入倉快速入門

更新時間:Jul 13, 2024

Realtime ComputeFlink版提供了豐富強大的資料即時入倉能力。通過Flink的全增量自動切換、元資訊自動探索、表結構變更自動同步和整庫同步等功能,簡化了資料即時入倉的鏈路,使得即時資料同步更加高效便捷。本文介紹如何快速構建一個從MySQL到Hologres的資料同步作業。

背景資訊

假設MySQL執行個體中有一個tpc_ds庫,裡面有24張表結構不相同的業務表。另外還有user_db1~user_db3三個庫,由於進行了分庫分表的設計,每個庫中分別有3張表結構相同的表,共包含名稱為user01~user09的9張表。在阿里雲DMS控制台觀察到MySQL中的庫和表情況如下圖所示。資料庫和表情況

此時,如果您希望開發一個資料同步的作業,將這些表和資料都同步到Hologres中,其中user分庫分表能合并到Hologres的一張表中,則可以按照以下步驟進行:

本文使用Flink提供的CREATE TABLE AS(CTAS)語句CREATE DATABASE AS(CDAS)語句來完成整庫同步、分庫分表合并同步,一鍵完成資料的全量和增量同步處理,以及即時的表結構變更同步。

前提條件

準備測試資料

  1. 單擊tpc_ds.sqluser_db1.sqluser_db2.sqluser_db3.sql下載測試資料到本地。

  2. 在Data Management控制台上,準備RDS MySQL的測試資料。

    1. 通過DMS登入RDS MySQL。

      詳情請參見通過DMS登入RDS MySQL

    2. 在已登入的SQLConsole視窗,輸入如下命令後單擊執行

      建立tpc_ds、user_db1、user_db2和user_db3四個資料庫。

      CREATE DATABASE tpc_ds;
      CREATE DATABASE user_db1;
      CREATE DATABASE user_db2;
      CREATE DATABASE user_db3;
    3. 在左側功能表列,單擊左側常用功能頁簽下的資料匯入

    4. 批量資料匯入頁簽下選擇需要匯入的資料庫,上傳對應的SQL檔案,單擊提交申請後,單擊執行變更。在彈出的對話方塊中單擊確定執行

      同樣的操作依次為tpc_ds、user_db1、user_db2和user_db3資料庫匯入對應的資料檔案。匯入資料

  3. 在Hologres控制台建立my_user資料庫,用於存放合并後的user表資料。

    操作步驟詳情請參見建立資料庫

配置IP白名單

為了讓Flink能訪問MySQL和Hologres執行個體,您需要將Flink工作空間的網段添加到MySQL和Hologres的白名單中。

  1. 擷取Flink工作空間的VPC網段。

    1. 登入Realtime Compute控制台

    2. 在目標工作空間右側操作列,選擇更多 > 工作空間詳情

    3. 工作空間詳情對話方塊,查看Flink虛擬交換器的網段資訊。

      網段資訊

  2. 在RDS MySQL的IP白名單中,添加Flink網段資訊。

    操作步驟詳情請參見設定IP白名單RDS白名單

  3. 在Hologres的IP白名單中,添加Flink網段資訊。

    在HoloWeb配置資料連線時,需要將串連的登入方式設定為目前使用者免密登入,才可以為當前串連配置IP白名單,操作步驟詳情請參見IP白名單Holo白名單

步驟一:建立Catalog

整庫同步、分庫分表合并同步、單表同步都需要依賴目標Catalog來建立目標表,也依賴源Catalog來擷取源表列表和資訊。因此,您需要通過控制台建立源Catalog和目標Catalog。本文將以源Catalog為MySQL Catalog和目標Catalog為Hologres Catalog為例,為您進行介紹。

  1. 建立名稱為mysql的MySQL Catalog。

    操作步驟詳情請參見配置MySQL Catalogmysql catalog

  2. 建立名稱為holo的Hologres Catalog。

    操作步驟詳情請參見建立Hologres CatalogHolo Catalog

  3. 中繼資料管理頁面Catalog列表中,確認已建立名為mysql和holo的Catalog。

步驟二:開發資料同步作業

  1. 登入Flink開發控制台,新增作業。

    1. SQL開發頁面,單擊建立

    2. 單擊空白的流作業草稿

      Flink為您提供了豐富的代碼模板和資料同步,每種代碼模板都為您提供了具體的使用情境、程式碼範例和使用指導。您可以直接單擊對應的模板快速地瞭解Flink產品功能和相關文法,實現您的商務邏輯,詳情請參見代碼模板資料同步模板

    3. 單擊下一步

    4. 新增作業草稿對話方塊,填寫作業配置資訊。

      作業參數

      說明

      樣本

      檔案名稱

      作業的名稱。

      說明

      作業名稱在當前專案中必須保持唯一。

      flink-test

      儲存位置

      指定該作業的代碼檔案所屬的檔案夾。

      您還可以在現有檔案夾右側,單擊建立檔案夾表徵圖,建立子檔案夾。

      作業草稿

      引擎版本

      當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹

      vvr-6.0.4-flink-1.15

    5. 單擊建立

  2. 將以下作業代碼拷貝到作業文本編輯區。

    將tpc_ds庫中所有表同步至Hologres,並將user的分庫分表合并同步到Hologres的單表中。程式碼範例如下所示。

    USE CATALOG holo;
    
    BEGIN STATEMENT SET;
    
    -- 同步TPCDS整庫到Hologres的tpc_ds庫中。
    CREATE DATABASE IF NOT EXISTS tpc_ds
    AS DATABASE mysql.tpc_ds INCLUDING ALL TABLES
    /*+ OPTIONS('server-id'='8001-8004') */ ;
    
    -- 同步user分庫分表到Hologres的my_user.users表中。
    CREATE TABLE IF NOT EXISTS my_user.users
    AS TABLE mysql.`user_db[0-9]+`.`user[0-9]+`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;

    將tpc_ds庫中所有表同步至Hologres使用CDAS (CREATE DATABASE AS) 文法來實現,將user的分庫分表合并同步到Hologres的單表使用CTAS (CREATE TABLE AS) 文法來實現,最後再使用STATEMENT SET文法將這兩條SQL語句合并在一個作業中提交。Flink會自動為Source進行最佳化,複用一個Source節點讀取多張MySQL表的資料,這能顯著降低MySQL的串連數和讀取壓力,提升穩定性。

    說明

    如果只想同步庫中的某些表,您也可以在CDAS文法中使用 INCLUDING TABLE或EXCLUDING TABLE 文法來指定具體需要同步的表。例如INCLUDING TABLE 'web.*'表示只同步中所有web開頭的表。

步驟三:啟動作業

  1. SQL開發頁面,單擊部署後,在彈出的對話方塊中,單擊確認部署

    說明

    Session叢集適用於非生產環境的開發測試環境,您可以使用Session叢集模式部署或調試作業,提高作業JM(Job Manager)資源使用率和提高作業啟動速度。但不推薦您將作業提交至Session叢集中,因為會存在業務穩定性問題,詳情請參見配置開發測試環境(內容已合并至作業調試中-待新文檔翻譯回稿後下線國際站中英文並挪位置)

  2. 作業營運頁面,單擊目標作業操作中的啟動。填寫配置資訊,詳情請參見作業啟動

  3. 單擊啟動

    作業啟動後,您可以在作業營運頁面觀察作業的運行資訊和狀態。作業狀態

步驟四:觀察全量同步結果

  1. 登入Hologres管理主控台

  2. 中繼資料管理頁簽,查看Hologres執行個體下的tpc_ds資料庫中24張表和表資料。

    holo表資料

  3. 中繼資料管理頁簽,查看my_user庫下users表結構。

    同步後的表結構和資料如下圖所示。

    • 表結構表結構

      users表的表結構比MySQL源表中多了_db_name和_table_name兩列,代表資料來源的庫名和表名,且作為聯合主鍵的一部分來保證分庫分表合并後的資料唯一性。

    • 表資料

      在users表資訊頁面右上方,單擊查詢表後,輸入如下命令,單擊運行

      select * from users order by _db_name,_table_name,id;

      表資料結果如下圖所示。表資料

步驟五:觀察增量同步處理結果

同步作業會在全量資料同步完以後自動切換到增量資料同步階段,無需幹預。您可以通過資料曲線頁簽的currentEmitEventTimeLag值來確定資料同步的階段。

  1. 登入Realtime Compute控制台

  2. 單擊對應工作空間操作列下的控制台

  3. 作業營運頁面,單擊目標作業名稱。

  4. 單擊資料曲線頁簽。

  5. 觀察currentEmitEventTimeLag曲線圖,確定資料同步階段。

    資料曲線

    • 值為0時,代表還在全量同步階段。

    • 值大於0時,代表已經進入增量同步處理階段。

  6. 驗證即時同步資料變更和結構變更的能力。

    MySQL CDC資料來源支援在增量同步處理階段,即時同步表的資料變更以及表的結構變更。您可以在作業進入到增量同步處理階段後,通過修改MySQL的user分表的表結構和資料,來驗證即時同步資料變更和結構變更的能力。

    1. 通過DMS登入RDS MySQL。

      詳情請參見通過DMS登入RDS MySQL

    2. 在user_db2資料庫下,執行如下命令修改user02表的表結構,並插入和更新資料。

      USE DATABASE `user_db2`;
      ALTER TABLE `user02` ADD COLUMN `age` INT;   -- 添加age列。
      INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30); -- 插入帶有age的資料。
      UPDATE `user05` SET name='JARK' WHERE id=15;  -- 更新另一張表,名字改成大寫。
    3. 在Hologres控制台,查看users表結構和資料的變化。

      在users表資訊頁面右上方,單擊查詢表後,輸入如下命令,單擊運行

      select * from users order by _db_name,_table_name,id;

      表資料結果如下圖所示。表結構和資料變化雖然多張分表的Schema並不一致,但是在user02上的表結構變更,以及資料變更都能即時地同步到下遊表中。在Hologres的users表中,看到了新增的age欄位,插入的Tony資料以及更新成大寫的JARK資料。

(可選)步驟六:作業資源配置

根據資料量的不同,我們往往需要調節不同節點的並發和資源,以達到更優的作業效能。您可以使用資源配置的基礎模式簡單配置作業並發度和CU數,也可以使用資源配置的專家模式細粒度地調整節點的並發和資源。

  1. 作業營運頁面,單擊目標作業名稱。

  2. 部署詳情頁簽下,單擊資源配置地區右上方的編輯

  3. 資源模式配置項中選擇專家模式後,單擊立刻擷取

  4. 將滑鼠移至上方至更多操作後,單擊展開全部

    通過完整的拓撲圖能瞭解到整個資料的同步計劃,即具體同步哪些表。

  5. 手動設定每個節點的並發。

    設定作業為4並發;由於tpc_ds中的store_sales表資料量最大,可以單獨設定holo.tpc_ds.store_sales Sink節點並發為8,提升Hologres的寫入效能。資源配置步驟詳情請參見配置作業部署資訊

  6. 資源配置右側,單擊儲存

  7. 作業營運頁面,單擊目標作業操作列下的啟動

    重要

    如果修改的目標作業是運行中狀態,您需要單擊目標作業名稱操作中的停止,直到作業為已停止狀態後,再單擊啟動,待啟動完成後,修改的資源配置才會生效。

  8. 單擊目標作業名稱,在作業總覽頁簽下查看調整效果。

常見問題

相關文檔