全部產品
Search
文件中心

Realtime Compute for Apache Flink:管理MySQL Catalog

更新時間:Jul 27, 2024

配置MySQL Catalog後,您就可以在Flink全託管控制台直接存取MySQL執行個體中的表。本文為您介紹如何在Flink全託管模式下配置、查看及刪除MySQL Catalog。

背景資訊

MySQL Catalog具有以下功能特點:

  • 直接存取MySQL執行個體中的表,無需通過DDL語句手動註冊MySQL表,提升開發效率和正確性。

  • MySQL Catalog提供的表可以直接作為Flink SQL作業中的MySQL CDC源表、MySQL結果表和MySQL維表。

  • 支援RDS MySQL、PolarDB MySQL或自建MySQL。

  • 支援直接存取分庫分表邏輯表。

  • 支援配合CDAS和CTAS文法完成基於MySQL資料來源的整庫同步、分庫分表合并同步、表結構變更同步。

本文將從以下方面為您介紹如何管理MySQL Catalog:

使用限制

  • 僅Flink計算引擎vvr-4.0.11-flink-1.13及以上版本支援配置MySQL Catalog。

  • Realtime Compute引擎VVR 8.0.7及以上版本,建立MySQL Catalog後,不支援使用視圖作為Flink的表。

  • 不支援修改Catalog。

  • 僅支援查詢資料庫和表,不支援建立資料庫和表。

  • 作為源表僅支援流讀、不支援批讀,支援作為維表和結果表。

  • MySQL Catalog無法識別建表語句中使用PolarDB特有文法的表。例如PARTITION BY KEY(`idempotent_id`) PARTITIONS 16, UNIQUE KEY `uk_order_id` (`order_id`)。

  • MySQL僅支援5.7和8.0.x版本。

說明

如果MySQL Catalog提供的表被作為MySQL CDC源表時,則需要在RDS MySQL、PolarDB MySQL或者自建MySQL上開啟Binlog等配置,詳情請參見配置MySQL

配置MySQL Catalog

支援UI與SQL命令兩種方式配置MySQL Catalog,推薦使用UI方式配置MySQL Catalog。

UI方式

  1. 進入中繼資料管理頁面。

    1. 登入Realtime Compute控制台,在Flink全託管頁簽,單擊目標工作空間操作列下的控制台

    2. 單擊中繼資料管理

  2. 單擊建立Catalog,選擇MySQL,單擊下一步

  3. 填寫參數配置資訊。

    重要

    Catalog建立完成後,以下配置資訊都不支援修改。如果需要修改,則您需要刪除掉已建立的Catalog,重新進行建立。

    配置資訊

    參數

    說明

    是否必填

    catalogname

    MySQL Catalog名稱。

    hostname

    MySQL資料庫的IP地址或者Hostname。

    port

    MySQL資料庫服務的連接埠號碼,預設值為3306。

    default-database

    預設的MySQL資料庫名稱。

    username

    MySQL資料庫服務的使用者名稱。

    password

    MySQL資料庫服務的密碼。

  4. 單擊確定

  5. 在左側中繼資料地區,查看建立的Catalog。

SQL命令

  1. 查詢指令碼文本編輯地區,輸入以下命令。

    CREATE CATALOG <yourcatalogname> WITH(
      'type' = 'mysql',
      'hostname' = '<hostname>',
      'port' = '<port>',
      'username' = '<username>',
      'password' = '<password>',
      'default-database' = '<dbname>',
      'catalog.table.metadata-columns' = '<metadata>'
    );

    參數

    說明

    是否必填

    yourcatalogname

    自訂MySQL Catalog名稱。

    重要

    參數替換為您的Catalog名稱後,需要去掉角括弧(<>),否則語法檢查會報錯。

    type

    類型,固定值為mysql。

    property-version

    Catalog的參數版本,可填的值為0和1,預設值為0。

    說明
    • 僅VVR 8.0.6及以上版本支援配置該參數。

    • 在不同參數版本裡,可用的參數集合和參數的預設值可能不同。如果存在區別,區別詳情會在參數的說明部分描述。

    • 推薦使用參數版本1。

    hostname

    MySQL資料庫的IP地址或者Hostname。

    port

    MySQL資料庫服務的連接埠號碼,預設值為3306。

    default-database

    預設的MySQL資料庫名稱。

    username

    MySQL資料庫服務的使用者名稱。

    password

    MySQL資料庫服務的密碼。

    catalog.table.metadata-columns

    指定擷取資料表時,表的Schema需要添加MySQL CDC源表的中繼資料列。多個中繼資料列使用英文分號(;)分隔,例如:op_ts;table_name;database_name。預設不添加中繼資料列。

    說明
    • 僅Realtime Compute引擎VVR 6.0.5及以上版本支援該參數。

    • 當配置該參數時,返回的表Schema會額外添加指定的中繼資料列,這些列只適用於MySQL CDC源表,所以該Catalog返回的表只能用作資料來源表,不可以用作結果表或維表。

    catalog.table.treat-tinyint1-as-boolean

    擷取資料表Schema時,對於MySQL的TinyInt(1)和Boolean類型是否對應到Flink的Boolean類型。參數取值如下:

    • true:MySQL的TinyInt(1)和Boolean對應到Flink的Boolean類型。

    • false:MySQL的TinyInt(1)和Boolean對應到Flink的TINYINT類型。

    說明
    • 不建議MySQL使用TinyInt(1)儲存0和1以外的數值,請選擇合適的資料類型做映射,參見類型映射

    • 僅Realtime Compute引擎VVR 8.0.4及以上版本支援配置該參數。

    • property-version=0時,預設值為true;property-version=1時,預設值為false。

  2. 選中建立Catalog的代碼後,單擊左側程式碼數上的運行

    image..png

查看MySQL Catalog

  1. 進入中繼資料管理頁面。

    1. 登入Realtime Compute控制台

    2. Flink全託管頁簽,單擊目標工作空間操作列下的控制台

    3. 單擊中繼資料管理

  2. Catalog列表頁面,查看Catalog名稱類型

    說明

    如果您需要查看Catalog下的資料庫和表,請單擊查看

使用MySQL Catalog

  • 從MySQL源表中讀取資料。

    INSERT INTO ${other_sink_table}
    SELECT ...
    FROM `${mysql_catalog}`.`${db_name}`.`${table_name}` /*+ OPTIONS('server-id'='6000-6018') */;
    說明
    • 在使用MySQL Catalog中的表時,可以通過Table Hints文法給表指定MySQL資料庫伺服器時區參數。例如mycatalog.mytable /*+ OPTIONS('server-time-zone'='Asia/Shanghai') */

    • 如果將MySQL Catalog作為MySQL CDC源表,建議使用Table Hints來為作業指定不同的 server-id。如果源表需要多並發讀取,server-id還需要配置成範圍格式,範圍中的server-id個數需要大於等於並發度。例如mycatalog.mytable /*+ OPTIONS('server-time-zone'='Asia/Shanghai', 'server-id' = '6000-6008') */

  • 讀取MySQL分庫分表邏輯表。

    MySQL Catalog支援使用Regex,將庫名和表名作為邏輯表名,來讀取分庫分表的資料。例如,有一個分庫分表的MySQL資料庫,包括user01、user02和user99等多個表,分散在db01~db10等資料庫中,且所有表的Schema都相互相容,則可以通過如下Regex的庫名表名來訪問到所有user的分庫分表。

    SELECT ... FROM `db.*`.`user.*` /*+ OPTIONS('server-id'='6000-6018') */;

    分庫分表的邏輯表會返回額外的_db_name (STRING) 和_table_name (STRING)兩個系統欄位,且這兩個欄位與原分表的主鍵會作為邏輯表的新聯合主鍵以保證主鍵的唯一性。如果user01~user99的主鍵均為id,則user邏輯表的聯合主鍵為(_db_name, _table_name, id)。MySQL Catalog支援結合Regex讀取分庫分表資料,具體樣本和使用限制請參見CREATE TABLE AS(CTAS)語句

  • 使用CTAS和CDAS即時同步MySQL資料變更和結構變更。

    USE CATALOG `${target_catalog}`;
    
    -- 單表同步,即時同步表層級的表結構變更和資料變更。
    CREATE TABLE IF NOT EXISTS `${target_table_name}`
    WITH (...)
    AS TABLE `${mysql_catalog}`.`${db_name}`.`${table_name}`
    /*+ OPTIONS('server-id'='6000-6018') */;
    
    -- 整庫同步,即時同步整庫層級的表結構變更和資料變更。
    CREATE DATABASE `${target_db_name}` WITH (...)
    AS DATABASE `${mysql_catalog}`.`${db_name}` INCLUDING ALL TABLES
    /*+ OPTIONS('server-id'='6000-6018') */;
                        

    更多樣本和使用限制請參見CREATE TABLE AS(CTAS)語句CREATE DATABASE AS(CDAS)語句

  • 從MySQL維表中讀取資料。

    INSERT INTO ${other_sink_table}
    SELECT ...
    FROM ${other_source_table} AS e
    JOIN `${mysql_catalog}`.`${db_name}`.`${table_name}` FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;
  • 寫入結果資料至MySQL表中。

    INSERT INTO `${mysql_catalog}`.`${db_name}`.`${table_name}`
    SELECT ...
    FROM ${other_source_table}

刪除MySQL Catalog

支援UI與SQL命令兩種方式刪除MySQL Catalog,推薦使用UI方式刪除MySQL Catalog。

UI方式

  1. 進入中繼資料管理頁面。

    1. 登入Realtime Compute控制台

    2. Flink全託管頁簽,單擊目標工作空間操作列下的控制台

    3. 單擊中繼資料管理

  2. Catalog列表頁面,單擊目標Catalog名稱對應操作列的刪除

  3. 在彈出的提示頁面中,單擊刪除

  4. 左側中繼資料地區下,查看目標Catalog是否已刪除。

SQL命令方式

  1. 查詢指令碼文本編輯地區,輸入以下命令。

    DROP CATALOG ${catalog_name}

    其中,catalog_name為您要刪除的在Flink全託管開發控制台上顯示的MySQL Catalog名稱。

    說明

    刪除MySQL Catalog不會影響已啟動並執行作業,但會導致使用該Catalog下表的作業,在上線或重啟時報無法找到該表的錯誤,請您謹慎操作。

  2. 選中刪除Catalog的命令,滑鼠右鍵選擇運行

  3. 在左側中繼資料地區,查看目標Catalog是否已刪除。