全部产品
Search
文档中心

实时计算Flink版:CREATE TABLE AS(CTAS)语句

更新时间:Oct 14, 2024

通过CTAS语句,在实时同步数据的同时,还能实时将上游表结构(Schema)的变更同步到下游表,提高您在目标存储中创建表和维护源表结构变更的效率。本文为您介绍CREATE TABLE AS(CTAS)的使用方法,并提供了多种使用场景下的示例。

前提条件

执行CTAS语法前,确保工作空间中已注册目标端的Catalog。详情请参见管理元数据

使用限制

  • 仅Flink计算引擎vvr-4.0.11-flink-1.13及以上版本支持CTAS语法。

    重要

    CTAS语法不支持进行调试。

  • 仅Flink计算引擎vvr-4.0.12-flink-1.13及以上版本支持同步自定义计算列。

  • VVR 4.0.16以下版本,不支持在一个作业中使用多个CTAS语句将同一张数据源表同步到不同的结果表。

  • 不支持在同一作业中混合使用CTAS和INSERT INTO语句。

  • CTAS支持的上下游存储列表如下,您可以从下表的源表和结果表中各选一个进行组合。

    连接器名称

    源表

    结果表

    备注

    MySQL

    ×

    • 分库分表合并同步时,默认会同步上游存储的数据库名称和表名称。

    • 单表同步时,不会同步数据库名称和表名称。如果您需要同步数据库名称和表名称,请使用SQL命令创建Catalog,并添加catalog.table.metadata-columns参数。详情请参见SQL命令

    • 不支持同步MySQL视图。

    消息队列Kafka

    ×

    无。

    MongoDB

    ×

    • 暂不支持分库分表合并同步。

    • 暂不支持同步MongoDB元信息。

    • 暂不支持CTAS新增表功能。

    • 支持通过CTAS语句将MongoDB中的数据及表结构变更同步至目标表,示例可参考示例九

    Upsert Kafka

    ×

    无。

    StarRocks

    ×

    仅支持EMR的StarRocks。

    实时数仓Hologres

    ×

    如果下游是Hologres,CTAS在默认情况下会为每个表创建相应数量(connectionSize参数值)个连接。此时您就可以使用connectionPoolName参数,让配置相同名称连接池的表可以共享连接池。

    说明

    在将数据同步到Hologres时,如果您的上游源表包含了Fixed Plan不支持类型的数据,建议通过INSERT INTO语句的方式,在Flink内部做类型转换后将数据同步到Hologres。不要用CTAS方式创建Sink结果表进行数据同步,因为这种方式会无法走Fixed Plan,写入性能较差。

    流式数据湖仓Paimon

    ×

    • 仅Flink计算引擎vvr-6.0.7-flink-1.15及以上版本支持Paimon结果表。

    • 暂不支持同步到Paimon DLF 2.0结果表。

功能特性

功能

详情

单表同步

支持实时同步源表的全量和增量数据到结果表中。

表结构变更同步

在实时同步数据的同时,还支持将源表的表结构变更(增加列信息等)实时同步到结果表中。

分库分表合并同步

支持使用正则表达式定义库名和表名,匹配数据源的多张分库分表,合并后同步到下游的一张表中。

说明

正则匹配时,不支持使用^进行表开头的匹配。

自定义计算列同步

支持在源表上新增计算列,以支持您对源表的某些列进行转换计算。计算列可以使用系统函数或自定义函数,允许指定新增列的位置,并将其作为结果表的物理列,实时地将计算列的结果同步到结果表中。

多CTAS语句

支持使用STATEMENT SET语法将多个CTAS语句作为一个作业一起提交,并支持对Source节点的合并复用,降低对数据源的压力。

多CTAS语句作业,支持新增CTAS语句加入新增表到同步作业中,详见示例六

启动流程

当执行CTAS语句时,将会按照以下流程执行:

  1. 检查目标存储中是否存在该结果表。

    • 如果不存在,则通过目标端Catalog去目标存储中创建相应的结果表,该结果表具有和数据源相同的Schema。

    • 如果存在,则跳过建表。

    • 如果已存在的结果表与源表Schema不一致,则会报错提示。

  2. 提交和启动相应的数据同步作业。

    将数据源的数据以及Schema的变更同步到结果表中。

例如,从MySQL到Hologres同步CTAS数据流程如下图所示。同步示意图

表结构变更同步策略

通过CTAS语句,在实时同步数据的同时,还能将源表Schema的变更同步到结果表中。Schema变更包括初始的表创建以及未来的表变更。

  • 当前支持的Schema变更策略详情如下:

    • 添加可空列:会自动在结果表Schema末尾添加对应的列,并自动同步新增列的数据。

    • 删除可空列:不会直接在结果表中删除该列,而是将该列的数据自动填充为NULL值。

    • 添加非空列:会自动在结果表Schema末尾添加对应的列,并自动同步新增列的数据,新增的列会默认设置为可空列,对于添加列发生之前的数据自动设置为NULL值。

    • 重命名列:被看作为添加列和删除列。直接在结果表中末尾添加重命名后的列,并将重命名前的列数据自动填充为NULL值。例如,如果col_a重命名为col_b,则会在结果表末尾添加col_b,并自动将col_a的数据填充为NULL值。

    • 列类型变更:

      • 对于支持列类型变更的下游系统,在下游Sink支持处理列类型变更后,CTAS 支持普通列的类型变更,例如,从INT类型变更到BIGINT类型。此类变更依赖于下游Sink支持的列类型变更规则,不同的结果表支持的列类型变更规则也不相同,请参考结果表文档获取其支持的列类型变更规则,目前只有Paimon支持处理列类型变更。

      • 对于不支持列类型变更的下游系统,比如Hologres,CTAS无法支持列类型变更。此类场景可以使用宽容模式同步,即在CTAS作业启动时在下游系统建立类型更加宽泛的表,在列类型变更发生时判断该类变更下游Sink是否可以接受来实现宽容的列类型变更支持,详情请参见示例八:CTAS语句使用字段类型宽容模式同步数据到Hologres表。目前只有Hologres支持宽容模式处理列类型变更。宽容模式应该在首次启动CTAS作业时开启,如果在首次启动时未开启宽容模式,需要删除下游表并且将作业无状态重启才能生效。

  • 暂不支持同步以下Schema的变更:

    • 主键或索引等约束的变更。

    • 非空列的删除。

    • 从NOT NULL转为NULLABLE变更。

重要
  • 如果遇到以上不支持的Schema变更,需要您手动删除下游结果表,重新启动CTAS作业,即重新创建结果表并重新同步历史数据。

  • CTAS不会去识别具体的DDL类型,而是对比前后两条数据的Schema差异。因此,如果您先删除了某列后,又加回了该列,且这两个DDL之间无数据变化,那么CTAS会认为没有发生结构变更。同理,如果您添加了一列,直到该表有数据变化,CTAS才会感知到结构变更,才会同步结构变更到结果表。

基本语法

CREATE TABLE IF NOT EXISTS <sink_table>
[COMMENT table_comment]
WITH (key1=val1, key2=val2, ...)
AS TABLE <source_table> [/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
[ADD COLUMN { <column_component> | (<column_component> [, ...])}];

<sink_table>:
  [catalog_name.][db_name.]table_name

<source_table>:
  [catalog_name.][db_name.]table_name

<column_component>:
  computed_column_definition [FIRST | AFTER column_name]

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

CTAS语法复用了CREATE TABLE语法的基本结构,其中的参数解释如下表所示。

参数

说明

sink_table

数据同步的结果表名,可以指定具体的Catalog名称和数据库名称。

COMMENT

结果表的描述,默认使用source_table的描述。

WITH

结果表参数,可填入结果表支持的WITH参数。支持的WITH参数详情请参见Upsert Kafka WITH参数Hologres WITH参数StarRocks WITH参数Paimon WITH参数

说明

key和value都需要为字符串类型,例如'jdbcWriteBatchSize' = '1024'

source_table

数据同步的源表表名,可指定具体的Catalog名称和Database名称。

OPTIONS

源表的参数,可填入源表支持的WITH参数。支持的WITH参数详情请参见MySQL WITH参数Kafka WITH参数

说明

key和value都需要为字符串类型,例如'server-id' = '65500'

ADD COLUMN

同步到结果表时,相对于源表新增的列,仅支持计算列。

column_component

新增列的描述。

computed_column_definition

计算列表达式的描述。

FIRST

新增列作为源表的第一个字段。如果不添加该参数,则新增列会默认作为源表的最后一个字段。

AFTER

新增列放在源表指定字段后面。

PARTITION BY

系统支持根据某列进行分区,创建分区表。

说明

因为IF NOT EXISTS关键字为必填,所以如果结果表在目标存储中并不存在,则会先创建该结果表,否则跳过创建步骤。创建的结果表Schema会使用源表的Schema,包括主键以及物理字段的字段名和字段类型,不包括计算列、meta字段、Watermark。其中源表到结果表的字段类型会经过类型映射,详情请参见对应连接器文档中的类型映射。

代码示例

示例一:单表同步

通常,CTAS都会配合数据源的Catalog和目标的Catalog一起使用,例如MySQL Catalog和Hologres Catalog结合CTAS语法,来完成MySQL到Hologres的全量和增量数据同步。使用MySQL Catalog可以自动解析源表的Schema及相应的参数,而不用手动编写DDL 。

假设已在工作空间中注册了名为holo的Hologres Catalog和名为mysql的MySQL Catalog。将MySQL中的web_sales表同步到Hologres中,代码示例如下。

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS web_sales  
WITH ('jdbcWriteBatchSize' = '1024')   -- 可选,指定结果表的参数。
AS TABLE mysql.tpcds.web_sales   
/*+ OPTIONS('server-id'='8001-8004') */;  -- 指定mysql-cdc源表的额外参数。

示例二:分库分表合并同步

对于分库分表合并同步的场景,您可以结合MySQL Catalog,利用正则表达式的表名和库名来匹配所要同步的多张表。使用CTAS可以将这多张分库分表合并到一张Hologres表中,库名和表名会作为额外的两个字段写入到该表中,为保证主键唯一性,库名、表名和原主键一起作为该Hologres表的新联合主键。

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`  
/*+ OPTIONS('server-id'='8001-8004') */;

其合并的效果如下图所示。效果如果在user02表中新增一列age,并插入一条数据。此时虽然多张分表的Schema并不一致,但是user02表后续的数据和Schema变更都能实时地自动同步到下游表中。

ALTER TABLE `user02` ADD COLUMN `age` INT;
INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30);

image

示例三:自定义计算列同步

本示例以user分库分表合并同步作为基础,介绍在分库分表合并的过程中,如何进行一些转换计算。

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */
ADD COLUMN (
  `c_id` AS `id` + 10 AFTER `id`,
  `calss` AS 3  AFTER `id`
);

新增计算列同步的效果如下图所示。

image

示例四:多个CTAS语句作为一个作业提交

实时计算Flink版支持使用STATEMENT SET语法将多个CTAS语句作为一个作业一起提交,并且可以对Source进行优化,复用一个Source节点读取多业务表的数据。这对于MySQL CDC数据源场景尤为适用,因为这可以减少server-id的使用,减少对数据库的连接数和读取压力。

重要

对于Source复用优化,需要这些Source表的options保持完全一致,才能合并成功进行复用。

例如示例一同步了web_sales表,示例二同步了user分库分表,您可以使用STATEMENT SET语法将它们作为一个作业提交。

USE CATALOG holo;

BEGIN STATEMENT SET;

-- 同步web_sales表。
CREATE TABLE IF NOT EXISTS web_sales
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */;

-- 同步user分库分表。
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;

END;

示例五:多个CTAS语句将同一张数据源表同步到不同的结果表

4.0.16以上版本中,在不添加计算列时,可以将同一张数据源表同步到不同的结果表。

USE CATALOG `holo`;

BEGIN STATEMENT SET;

-- 通过CTAS语句同步MySQL的user表到Holo数仓database1的user表中
CREATE TABLE IF NOT EXISTS `database1`.`user`
AS TABLE `mysql`.`tpcds`.`user`
/*+ OPTIONS('server-id'='8001-8004') */;

-- 通过CTAS语句同步MySQL的user表到Holo数仓database2的user表中
CREATE TABLE IF NOT EXISTS `database2`.`user`
AS TABLE `mysql`.`tpcds`.`user`
/*+ OPTIONS('server-id'='8001-8004') */;

END;

如果结果表需要添加计算列,则应按照如下方式进行同步:

-- 基于源表user创建临时表user_with_changed_id,支持定义计算列,例如这里的computed_id是基于源表的id计算获得。
CREATE TEMPORARY TABLE `user_with_changed_id` (
  `computed_id` AS `id` + 1000
) LIKE `mysql`.`tpcds`.`user`;

-- 基于源表user创建临时表user_with_changed_age,支持定义计算列,例如这里的computed_age是基于源表的age计算获得。
CREATE TEMPORARY TABLE `user_with_changed_age` (
  `computed_age` AS `age` + 1
) LIKE `mysql`.`tpcds`.`user`;

BEGIN STATEMENT SET;

-- 通过CTAS语句同步MySQL的user表到Holo数仓的user_with_changed_id表中,表中会包含通过计算获得的id,即computed_id列。 
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_id`
AS TABLE `user_with_changed_id`
/*+ OPTIONS('server-id'='8001-8004') */;

-- 通过CTAS语句同步MySQL的user表到Holo数仓的user_with_changed_age表中,表中会包含通过计算获得的age,即computed_age列。 
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_age`
AS TABLE `user_with_changed_age`
/*+ OPTIONS('server-id'='8001-8004') */;

END;

示例六:多个CTAS语句时,新增CTAS语句加入数据同步作业

使用VVR 8.0.1及以上版本时,多个CTAS语句的作业启动后,如果新增CTAS语句,支持从作业快照重启,从而捕获到新的表,对新增表进行数据同步。

  1. SQL作业开发时需要增加以下语句,开启新增表读取功能。

    SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
  2. 当需要新增CTAS语句时,在作业运维页面停止作业并勾选停止前创建一次快照

  3. SQL开发中,增加CTAS语句,并重新部署这个SQL作业。

  4. 作业运维页面单击目标作业名称,状态集管理页签,单击历史

  5. 作业快照列表中,找到停止作业时创建的快照。

  6. 单击目标快照操作列,选择更多 > 从该快照恢复作业

  7. 作业启动配置对话框,配置作业启动信息,详情请参见作业启动

重要

新增CTAS语句使用时,存在以下限制:

  • 使用CDC源表同步时,仅支持源表启动模式为initial的作业使用新增表功能。

  • 新增的CTAS语句的对应Source必须能够复用优化,也就是新增的源表配置需要和原有的源表配置保持完全一致。

  • 新增CTAS语句前后,作业不能有其他参数的变更,比如更改启动模式。

示例七:通过CTAS语句将MySQL数据源表同步到Hologres分区表

Hologres分区表建表时,如果Hologres表存在主键,则要求分区字段必须是主键中的字段。假设有一张MySQL表需要同步到Hologres,其建表语句如下。

CREATE TABLE orders (
    order_id INTEGER NOT NULL,
    product_id INTEGER NOT NULL,
    city VARCHAR(100) NOT NULL
    order_date DATE,
    purchaser INTEGER,
    PRIMARY KEY(order_id, product_id)
);

当使用CTAS同步数据源表到Hologres的分区表中时:

  • 如果上游表的主键包含分区字段,例如Hologres表的分区字段是product_id,可以通过如下SQL实现。

    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`
    PARTITIONED BY (product_id)
    AS TABLE `mysql`.`tpcds`.`orders`;
  • 如果上游表的主键不包含分区字段,例如Hologres表的分区字段是city,创建Hologres表时会使用MySQL表中的主键,由于上游表的主键不包含分区字段,作业会出错。此时,您可以在CTAS中通过声明主键的方式,重新指定目标Hologres分区表的主键,使得任务正常运行,示例如下。

    -- 可以通过如下SQL指定Hologres分区表的主键为order_id,product_id和city。
    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`(
        CONSTRAINT `PK_order_id_city` PRIMARY KEY (`order_id`,`product_id`,`city`) NOT ENFORCED
    )
    PARTITIONED BY (city)
    AS TABLE `mysql`.`tpcds`.`orders`;

示例八:CTAS语句使用字段类型宽容模式同步数据到Hologres表

在CTAS场景中,可能需要调整已有字段数据类型的精度(例如,从VARCHAR(10)到VARCHAR(20))。

  • Flink计算引擎VVR 6.0.5-Flink 1.15以下版本,上游修改数据类型可能导致CTAS任务失败,只能重建结果表。

  • Flink计算引擎VVR 6.0.5-Flink 1.15及以上版本,在同步数据到Hologres表时,支持使用类型宽容模式。宽容模式应该在首次启动CTAS作业时开启,如果在首次启动时未开启宽容模式,需要删除下游表并且将作业无状态重启才能生效。

    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders` 
    WITH (
    'connector' = 'hologres', 
    'enableTypeNormalization' = 'true' -- 使用字段类型宽容模式。
    ) AS TABLE `mysql`.`tpcds`.`orders`;

    在上游发生数据类型修改事件时,只要所修改类型与原类型的归一化类型相同,都视作修改成功。目前类型归一化规则如下:

    • TINYINT、SMALLINT、INT和BIGINT归一化为BIGINT。

    • CHAR、VARCHAR和STRING归一化为STRING。

    • FLOAT和DOUBLE归一化为DOUBLE。

    • 其他数据类型按照原本的类型映射规则创建,详情参见类型映射

    例如:

    • SMALLINT修改为INT,两者的归一化类型都是BIGINT,视为修改成功,CTAS作业正常运行。

    • 从FLOAT改为BIGINT,两者的归一化类型分别为DOUBLE和BIGINT,属于不兼容的情况,会抛出异常。

示例九:通过CTAS语句将MongoDB数据源表同步到Hologres表

实时计算Flink VVR 8.0.6及以上版本,CTAS语句支持同步MongoDB数据源表,能够在实时同步MongoDB数据的同时将上游表结构变更同步到下游表。可以配合MongoDB Catalog使用,无需手动定义Schema,MongoDB Catalog详情可参考管理MongoDB Catalog

这里以使用CTAS语句同步MongoDB数据源表数据到Hologres表为例:

BEGIN STATEMENT SET;

CREATE TABLE IF NOT EXISTS `holo`.`database`.`table1`
AS TABLE `mongodb`.`database`.`collection1`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;

CREATE TABLE IF NOT EXISTS `holo`.`database`.`table2`
AS TABLE `mongodb`.`database`.`collection2`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;

END;
重要

使用CTAS或CDAS语句将MongoDB中的数据同步至目标表时,必须满足以下要求:

  • 实时计算Flink VVR版本必须为8.0.6及以上,MongoDB数据库版本必须为6.0及以上。

  • 在SQL Hints中已将scan.incremental.snapshot.enabled和scan.full-changelog参数都设置为true。

  • MongoDB数据库已开启前像后像(Pre- and Post-images)记录功能,开启方法参见Document Preimages

当使用同一个作业同步多个MongoDB集合时,需要满足以下条件:

  • 每张表关于MongoDB的配置必须完全相同,包括hosts、scheme、username、password、connectionOptions。

  • 每张表的scan.startup.mode配置必须完全相同。

示例十:MySQL整库同步Kafka

在实际使用中,同一张MySQL表可能被多个作业依赖,当多个任务使用同一张MySQL表做处理时,MySQL数据库会启动多个连接,对MySQL服务器和网络造成很大的压力。为了缓解对上游MySQL数据库的压力,实时计算Flink版提供MySQL整库同步到Kafka的能力,通过引入Kafka作为中间层,并使用CDAS整库同步或CTAS整表同步到Kafka来解决。具体操作请参见MySQL整库同步Kafka

相关文档