通過CTAS語句,在即時同步資料的同時,還能即時將上遊表結構(Schema)的變更同步到下遊表,提高您在目標儲存中建立表和維護源表結構變更的效率。本文為您介紹CREATE TABLE AS(CTAS)的使用方法,並提供了多種使用情境下的樣本。
資料攝入YAML作業是Realtime ComputeFlink產品中整合的最新Flink CDC功能,支援通過簡單的YAML語言編寫強巨量資料整合作業。
YAML作業覆蓋了CTAS和CDAS語句的關鍵能力,如整庫同步、schema evolution等,並能支援更多情境,如表結構變更立即同步,原始binlog同步、自動同步新增表等。建議使用YAML完成您的資料攝入作業邏輯開發,您可以參考資料攝入YAML最佳實務瞭解更多案例。
前提條件
執行CTAS文法前,確保工作空間中登入目標端的Catalog。詳情請參見資料管理。
使用限制
僅Flink計算引擎vvr-4.0.11-flink-1.13及以上版本支援CTAS文法。
重要CTAS文法不支援進行調試。
僅Flink計算引擎vvr-4.0.12-flink-1.13及以上版本支援同步自訂計算列。
VVR 4.0.16以下版本,不支援在一個作業中使用多個CTAS語句將同一張資料來源表同步到不同的結果表。
不支援在同一作業中混合使用CTAS和INSERT INTO語句。
CTAS支援的上下遊儲存列表如下,您可以從下表的源表和結果表中各選一個進行組合。
連接器名稱
源表
結果表
備忘
√
×
分庫分表合并同步時,預設會同步上遊儲存的資料庫名稱和表名稱。
單表同步時,不會同步資料庫名稱和表名稱。如果您需要同步資料庫名稱和表名稱,請使用SQL命令建立Catalog,並添加catalog.table.metadata-columns參數。詳情請參見SQL命令。
不支援同步MySQL視圖。
√
×
無。
√
×
暫不支援分庫分表合并同步。
暫不支援同步MongoDB元資訊。
暫不支援CTAS新增表功能。
支援通過CTAS語句將MongoDB中的資料及表結構變更同步至目標表,樣本可參考樣本九。
×
√
無。
×
√
僅支援EMR的StarRocks。
×
√
如果下遊是Hologres,CTAS在預設情況下會為每個表建立相應數量(connectionSize參數值)個串連。此時您就可以使用connectionPoolName參數,讓配置相同名稱串連池的表可以共用串連池。
說明在將資料同步到Hologres時,如果您的上遊源表包含了Fixed Plan不支援類型的資料,建議通過INSERT INTO語句的方式,在Flink內部做類型轉換後將資料同步到Hologres。不要用CTAS方式建立Sink結果表進行資料同步,因為這種方式會無法走Fixed Plan,寫入效能較差。
×
√
僅Flink計算引擎vvr-6.0.7-flink-1.15及以上版本支援Paimon結果表。
僅Realtime Compute引擎VVR 8.0.10及以上版本支援同步到Paimon DLF 2.0結果表。
功能特性
功能 | 詳情 |
單表同步 | 支援即時同步源表的全量和增量資料到結果表中。 |
表結構變更同步 | 在即時同步資料的同時,還支援將源表的表結構變更(增加列資訊等)即時同步到結果表中。 |
分庫分表合并同步 | 支援使用Regex定義庫名和表名,匹配資料來源的多張分庫分表,合并後同步到下遊的一張表中。 說明 正則匹配時,不支援使用^進行表開頭的匹配。 |
自訂計算列同步 | 支援在源表上新增計算資料行,以支援您對源表的某些列進行轉換計算。計算資料行可以使用系統函數或自訂函數,允許指定新增列的位置,並將其作為結果表的物理列,即時地將計算資料行的結果同步到結果表中。 |
多CTAS語句 | 支援使用STATEMENT SET文法將多個CTAS語句作為一個作業一起提交,並支援對Source節點的合并複用,降低對資料來源的壓力。 多CTAS語句作業,支援新增CTAS語句加入新增表到同步作業中,詳見樣本六。 |
啟動流程
當執行CTAS語句時,將會按照以下流程執行:
檢查目標儲存中是否存在該結果表。
如果不存在,則通過目標端Catalog去目標儲存中建立相應的結果表,該結果表具有和資料來源相同的Schema。
如果存在,則跳過建表。
如果已存在的結果表與源表Schema不一致,則會報錯提示。
提交和啟動相應的資料同步作業。
將資料來源的資料以及Schema的變更同步到結果表中。
例如,從MySQL到Hologres同步CTAS資料流程如下圖所示。
表結構變更同步策略
通過CTAS語句,在即時同步資料的同時,還能將源表Schema的變更同步到結果表中。Schema變更包括初始的表建立以及未來的表變更。
當前支援的Schema變更策略詳情如下:
添加可空列:會自動在結果表Schema末尾添加對應的列,並自動同步新增列的資料。
刪除可空列:不會直接在結果表中刪除該列,而是將該列的資料自動填滿為NULL值。
添加非空列:會自動在結果表Schema末尾添加對應的列,並自動同步新增列的資料,新增的列會預設設定為可空列,對於添加列發生之前的資料自動化佈建為NULL值。
重新命名列:被看作為添加列和刪除列。直接在結果表中末尾添加重新命名後的列,並將重新命名前的列資料自動填滿為NULL值。例如,如果col_a重新命名為col_b,則會在結果表末尾添加col_b,並自動將col_a的資料填充為NULL值。
列類型變更:
對於支援列類型變更的下遊系統,在下遊Sink支援處理列類型變更後,CTAS 支援普通列的類型變更,例如,從INT類型變更到BIGINT類型。此類變更依賴於下遊Sink支援的列類型變更規則,不同的結果表支援的列類型變更規則也不相同,請參考結果表文檔擷取其支援的列類型變更規則,目前只有Paimon支援處理列類型變更。
對於不支援列類型變更的下遊系統,比如Hologres,CTAS無法支援列類型變更。此類情境可以使用寬容模式同步,即在CTAS作業啟動時在下遊系統建立類型更加寬泛的表,在列類型變更發生時判斷該類變更下遊Sink是否可以接受來實現寬容的列類型變更支援,詳情請參見樣本八:CTAS語句使用欄位類型寬容模式同步資料到Hologres表。目前只有Hologres支援寬容模式處理列類型變更。寬容模式應該在初次開機CTAS作業時開啟,如果在初次開機時未開啟寬容模式,需要刪除下遊表並且將作業無狀態重啟才會生效。
暫不支援同步以下Schema的變更:
主鍵或索引等約束的變更。
非空列的刪除。
從NOT NULL轉為NULLABLE變更。
如果遇到以上不支援的Schema變更,需要您手動刪除下遊結果表,重新啟動CTAS作業,即重新建立結果表並重新同步歷史資料。
CTAS不會去識別具體的DDL類型,而是對比前後兩條資料的Schema差異。因此,如果您先刪除了某列後,又加回了該列,且這兩個DDL之間無資料變化,那麼CTAS會認為沒有發生結構變更。同理,如果您添加了一列,直到該表有資料變化,CTAS才會感知到結構變更,才會同步結構變更到結果表。
基本文法
CREATE TABLE IF NOT EXISTS <sink_table>
[COMMENT table_comment]
WITH (key1=val1, key2=val2, ...)
AS TABLE <source_table> [/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
[ADD COLUMN { <column_component> | (<column_component> [, ...])}];
<sink_table>:
[catalog_name.][db_name.]table_name
<source_table>:
[catalog_name.][db_name.]table_name
<column_component>:
computed_column_definition [FIRST | AFTER column_name]
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
CTAS文法複用了CREATE TABLE文法的基本結構,其中的參數解釋如下表所示。
參數 | 說明 |
sink_table | 資料同步的結果表名,可以指定具體的Catalog名稱和資料庫名稱。 |
COMMENT | 結果表的描述,預設使用source_table的描述。 |
WITH | 結果表參數,可填入結果表支援的WITH參數。支援的WITH參數詳情請參見Upsert Kafka WITH參數、Hologres WITH參數、StarRocks WITH參數或Paimon WITH參數。 說明 key和value都需要為字串類型,例如 |
source_table | 資料同步的源表表名,可指定具體的Catalog名稱和Database名稱。 |
OPTIONS | 源表的參數,可填入源表支援的WITH參數。支援的WITH參數詳情請參見MySQL WITH參數和Kafka WITH參數。 說明 key和value都需要為字串類型,例如'server-id' = '65500'。 |
ADD COLUMN | 同步到結果表時,相對於源表新增的列,僅支援計算資料行。 |
column_component | 新增列的描述。 |
computed_column_definition | 計算資料行運算式的描述。 |
FIRST | 新增列作為源表的第一個欄位。如果不添加該參數,則新增列會預設作為源表的最後一個欄位。 |
AFTER | 新增列放在源表指定欄位後面。 |
PARTITION BY | 系統支援根據某列進行分區,建立分區表。 |
因為IF NOT EXISTS關鍵字為必填,所以如果結果表在目標儲存中並不存在,則會先建立該結果表,否則跳過建立步驟。建立的結果表Schema會使用源表的Schema,包括主鍵以及物理欄位的欄位名和欄位類型,不包括計算資料行、meta欄位、Watermark。其中源表到結果表的欄位類型會經過類型映射,詳情請參見對應連接器文檔中的類型映射。
程式碼範例
樣本一:單表同步
通常,CTAS都會配合資料來源的Catalog和目標的Catalog一起使用,例如MySQL Catalog和Hologres Catalog結合CTAS文法,來完成MySQL到Hologres的全量和增量資料同步。使用MySQL Catalog可以自動解析源表的Schema及相應的參數,而不用手動編寫DDL 。
假設已在工作空間中註冊了名為holo的Hologres Catalog和名為mysql的MySQL Catalog。將MySQL中的web_sales表同步到Hologres中,程式碼範例如下。
USE CATALOG holo;
CREATE TABLE IF NOT EXISTS web_sales
WITH ('jdbcWriteBatchSize' = '1024') -- 可選,指定結果表的參數。
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */; -- 指定mysql-cdc源表的額外參數。
樣本二:分庫分表合并同步
對於分庫分表合并同步的情境,您可以結合MySQL Catalog,利用Regex的表名和庫名來匹配所要同步的多張表。使用CTAS可以將這多張分庫分表合并到一張Hologres表中,庫名和表名會作為額外的兩個欄位寫入到該表中,為保證主鍵唯一性,庫名、表名和原主鍵一起作為該Hologres表的新聯合主鍵。
USE CATALOG holo;
CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;
其合并的效果如下圖所示。如果在user02表中新增一列age,並插入一條資料。此時雖然多張分表的Schema並不一致,但是user02表後續的資料和Schema變更都能即時地自動同步到下遊表中。
ALTER TABLE `user02` ADD COLUMN `age` INT;
INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30);
樣本三:自訂計算列同步
本樣本以user分庫分表合并同步作為基礎,介紹在分庫分表合并的過程中,如何進行一些轉換計算。
USE CATALOG holo;
CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */
ADD COLUMN (
`c_id` AS `id` + 10 AFTER `id`,
`calss` AS 3 AFTER `id`
);
新增計算資料行同步的效果如下圖所示。
樣本四:多個CTAS語句作為一個作業提交
Realtime ComputeFlink版支援使用STATEMENT SET文法將多個CTAS語句作為一個作業一起提交,並且可以對Source進行最佳化,複用一個Source節點讀取多業務表的資料。這對於MySQL CDC資料來源情境尤為適用,因為這可以減少server-id的使用,減少對資料庫的串連數和讀取壓力。
對於Source複用最佳化,需要這些Source表的options保持完全一致,才能合并成功進行複用。
例如樣本一同步了web_sales表,樣本二同步了user分庫分表,您可以使用STATEMENT SET文法將它們作為一個作業提交。
USE CATALOG holo;
BEGIN STATEMENT SET;
-- 同步web_sales表。
CREATE TABLE IF NOT EXISTS web_sales
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */;
-- 同步user分庫分表。
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;
END;
樣本五:多個CTAS語句將同一張資料來源表同步到不同的結果表
4.0.16以上版本中,在不添加計算資料行時,可以將同一張資料來源表同步到不同的結果表。
USE CATALOG `holo`;
BEGIN STATEMENT SET;
-- 通過CTAS語句同步MySQL的user表到Holo數倉database1的user表中
CREATE TABLE IF NOT EXISTS `database1`.`user`
AS TABLE `mysql`.`tpcds`.`user`
/*+ OPTIONS('server-id'='8001-8004') */;
-- 通過CTAS語句同步MySQL的user表到Holo數倉database2的user表中
CREATE TABLE IF NOT EXISTS `database2`.`user`
AS TABLE `mysql`.`tpcds`.`user`
/*+ OPTIONS('server-id'='8001-8004') */;
END;
如果結果表需要添加計算資料行,則應按照如下方式進行同步:
-- 基於源表user建立暫存資料表user_with_changed_id,支援定義計算資料行,例如這裡的computed_id是基於源表的id計算獲得。
CREATE TEMPORARY TABLE `user_with_changed_id` (
`computed_id` AS `id` + 1000
) LIKE `mysql`.`tpcds`.`user`;
-- 基於源表user建立暫存資料表user_with_changed_age,支援定義計算資料行,例如這裡的computed_age是基於源表的age計算獲得。
CREATE TEMPORARY TABLE `user_with_changed_age` (
`computed_age` AS `age` + 1
) LIKE `mysql`.`tpcds`.`user`;
BEGIN STATEMENT SET;
-- 通過CTAS語句同步MySQL的user表到Holo數倉的user_with_changed_id表中,表中會包含通過計算獲得的id,即computed_id列。
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_id`
AS TABLE `user_with_changed_id`
/*+ OPTIONS('server-id'='8001-8004') */;
-- 通過CTAS語句同步MySQL的user表到Holo數倉的user_with_changed_age表中,表中會包含通過計算獲得的age,即computed_age列。
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_age`
AS TABLE `user_with_changed_age`
/*+ OPTIONS('server-id'='8001-8004') */;
END;
樣本六:多個CTAS語句時,新增CTAS語句加入資料同步作業
使用VVR 8.0.1及以上版本時,多個CTAS語句的作業啟動後,如果新增CTAS語句,支援從作業快照重啟,從而捕獲到新的表,對新增表進行資料同步。
SQL作業開發時需要增加以下語句,開啟新增表讀取功能。
SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
當需要新增CTAS語句時,在作業營運頁面停止作業並勾選停止前建立一次快照。
在SQL開發中,增加CTAS語句,並重新部署這個SQL作業。
在作業營運頁面單擊目標作業名稱,狀態集管理頁簽,單擊歷史。
在作業快照列表中,找到停止作業時建立的快照。
單擊目標快照操作列,選擇
。在作業啟動配置對話方塊,配置作業啟動資訊,詳情請參見作業啟動。
新增CTAS語句使用時,存在以下限制:
使用CDC源表同步時,僅支援源表啟動模式為initial的作業使用新增表功能。
新增的CTAS語句的對應Source必須能夠複用最佳化,也就是新增的源表配置需要和原有的源表配置保持完全一致。
新增CTAS語句前後,作業不能有其他參數的變更,比如更改啟動模式。
樣本七:通過CTAS語句將MySQL資料來源表同步到Hologres分區表
Hologres分區表建表時,如果Hologres表存在主鍵,則要求分區欄位必須是主鍵中的欄位。假設有一張MySQL表需要同步到Hologres,其建表語句如下。
CREATE TABLE orders (
order_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
city VARCHAR(100) NOT NULL
order_date DATE,
purchaser INTEGER,
PRIMARY KEY(order_id, product_id)
);
當使用CTAS同步資料來源表到Hologres的分區表中時:
如果上遊表的主鍵包含分區欄位,例如Hologres表的分區欄位是product_id,可以通過如下SQL實現。
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders` PARTITIONED BY (product_id) AS TABLE `mysql`.`tpcds`.`orders`;
如果上遊表的主鍵不包含分區欄位,例如Hologres表的分區欄位是city,建立Hologres表時會使用MySQL表中的主鍵,由於上遊表的主鍵不包含分區欄位,作業會出錯。此時,您可以在CTAS中通過聲明主鍵的方式,重新指定目標Hologres分區表的主鍵,使得任務正常運行,樣本如下。
-- 可以通過如下SQL指定Hologres分區表的主鍵為order_id,product_id和city。 CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`( CONSTRAINT `PK_order_id_city` PRIMARY KEY (`order_id`,`product_id`,`city`) NOT ENFORCED ) PARTITIONED BY (city) AS TABLE `mysql`.`tpcds`.`orders`;
樣本八:CTAS語句使用欄位類型寬容模式同步資料到Hologres表
在CTAS情境中,可能需要調整已有欄位資料類型的精度(例如,從VARCHAR(10)到VARCHAR(20))。
Flink計算引擎VVR 6.0.5-Flink 1.15以下版本,上遊修改資料類型可能導致CTAS任務失敗,只能重建結果表。
Flink計算引擎VVR 6.0.5-Flink 1.15及以上版本,在同步資料到Hologres表時,支援使用類型寬容模式。寬容模式應該在初次開機CTAS作業時開啟,如果在初次開機時未開啟寬容模式,需要刪除下遊表並且將作業無狀態重啟才會生效。
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders` WITH ( 'connector' = 'hologres', 'enableTypeNormalization' = 'true' -- 使用欄位類型寬容模式。 ) AS TABLE `mysql`.`tpcds`.`orders`;
在上遊發生資料類型修改事件時,只要所修改類型與原類型的歸一化類型相同,都視作修改成功。目前類型歸一化規則如下:
TINYINT、SMALLINT、INT和BIGINT歸一化為BIGINT。
CHAR、VARCHAR和STRING歸一化為STRING。
FLOAT和DOUBLE歸一化為DOUBLE。
其他資料類型按照原本的類型映射規則建立,詳情參見類型映射。
例如:
SMALLINT修改為INT,兩者的歸一化類型都是BIGINT,視為修改成功,CTAS作業正常運行。
從FLOAT改為BIGINT,兩者的歸一化類型分別為DOUBLE和BIGINT,屬於不相容的情況,會拋出異常。
樣本九:通過CTAS語句將MongoDB資料來源表同步到Hologres表
Realtime ComputeFlink VVR 8.0.6及以上版本,CTAS語句支援同步MongoDB資料來源表,能夠在即時同步MongoDB資料的同時將上遊表結構變更同步到下遊表。可以配合MongoDB Catalog使用,無需手動定義Schema,MongoDB Catalog詳情可參考管理MongoDB Catalog。
這裡以使用CTAS語句同步MongoDB資料來源表資料到Hologres表為例:
BEGIN STATEMENT SET;
CREATE TABLE IF NOT EXISTS `holo`.`database`.`table1`
AS TABLE `mongodb`.`database`.`collection1`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;
CREATE TABLE IF NOT EXISTS `holo`.`database`.`table2`
AS TABLE `mongodb`.`database`.`collection2`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;
END;
使用CTAS或CDAS語句將MongoDB中的資料同步至目標表時,必須滿足以下要求:
Realtime ComputeFlink VVR版本必須為8.0.6及以上,MongoDB資料庫版本必須為6.0及以上。
在SQL Hints中已將scan.incremental.snapshot.enabled和scan.full-changelog參數都設定為true。
MongoDB資料庫已開啟前像後像(Pre- and Post-images)記錄功能,開啟方法參見Document Preimages。
當使用同一個作業同步多個MongoDB集合時,需要滿足以下條件:
每張表關於MongoDB的配置必須完全相同,包括hosts、scheme、username、password、connectionOptions。
每張表的scan.startup.mode配置必須完全相同。
樣本十:MySQL整庫同步Kafka
在實際使用中,同一張MySQL表可能被多個作業依賴,當多個任務使用同一張MySQL表做處理時,MySQL資料庫會啟動多個串連,對MySQL伺服器和網路造成很大的壓力。為了緩解對上遊MySQL資料庫的壓力,Realtime ComputeFlink版提供MySQL整庫同步到Kafka的能力,通過引入Kafka作為中介層,並使用CDAS整庫同步或CTAS整表同步到Kafka來解決。具體操作請參見MySQL整庫同步Kafka。
相關文檔
為什麼主鍵為bigint unsigned的MySQL表註冊Flink Cataolg,主鍵會變為decimal?但是使用CTAS同步到Hologres後,主鍵又變為text?
如果您需要進行整庫同步、分庫合并或源庫新增表同步,詳情請參見CREATE DATABASE AS(CDAS)語句。
使用CTAS和CDAS實現資料同步的教程詳情,請參見資料庫即時入倉快速入門、基於Flink+Hologres搭建即時數倉或基於Flink搭建流式湖倉OpenLake方案。