全部产品
Search
文档中心

实时计算Flink版:变更Source

更新时间:Jul 31, 2024

本文为您介绍Source变更的可兼容性和不可兼容性详情。

背景信息

当前建表DDL的Schema部分的变更,需要根据使用该表的Query的变化来判断整体的兼容性。WITH中除了连接器类型,其他的属性暂不进行兼容性检查。

可兼容的变更

  • 修改字段,且不影响下游算子的兼容性,属于完全或部分兼容变更。

    -- 原始SQL。
    create table MyTable (
      a int,
      b bigint,
      c varchar
    ) with (
      'connector'='datagen'
    );
    
    select a, sum(b), max(c) from MyTable group by a;
    
    
    -- 新增字段d,该修改属于完全兼容变更。
    -- Query中未使用到字段d。
    create table MyTable (
      a int,
      b bigint,
      c varchar,
      d int
    ) with (
      'connector'='datagen'
    );
    
    select a, sum(b), max(c) from MyTable group by a;
    
    
    -- 修改字段:b -> b as d + 1,该修改属于部分兼容变更。
    -- max(c) 的计算结果不受影响。b列发生变化,因此原始的sum(b)被认为删除。
    -- 其对应的状态数据会被丢弃,新的sum(b)被认为新增指标,其值在作业启动时开始计算。
    create table MyTable (
      a int,
      d bigint,
      c varchar,
      b as d + 1
    ) with (
      'connector'='datagen'
    );
    
    select a, sum(b), max(c) from MyTable group by a;
  • 修改Watermark间隔,属于完全兼容变更。

  • 修改Primary Key,可能导致下游兼容性受到影响。例如,upsert key发生变化。

  • 修改Primary Key或WITH参数中的分片列,可能导致source的状态兼容性受到影响。例如mysql-cdc参数中的scan.incremental.snapshot.chunk.key-column发生变化,Source全量阶段的状态可能无法使用。

  • Changelog模式发生变化,使得发送给下游的消息类型可能发生变化(例如Delete、Update_before、Update_after等),可能导致下游有状态节点的兼容性受到影响。例如,Hologres连接器WTH参数cdcMode(是否采用CDC读取Binlog)修改、MongoDB或Postgres以Upsert还是Retract方式读取。

不兼容的变更

  • 修改连接器类型属于不兼容变更。

    -- 原始SQL。
    create table MyTable (
      a int,
      b bigint,
      c varchar
    ) with (
      'connector'='datagen'
    );
    
    select a, sum(b), max(c) from MyTable group by a;
    
    
    -- Connnector类型变化:datagen -> kafka,该修改属于不兼容修改。
    create table MyTable (
      a int,
      b bigint,
      c varchar
    ) with (
      'connector'='kafka',
      ...
    );
    
    select a, sum(b), max(c) from MyTable group by a;
  • 修改表名属于不兼容变更。

    -- 原始SQL。
    create table MyTable (
      a int,
      b bigint,
      c varchar
    ) with (
      'connector'='datagen'
    );
    select a, sum(b), max(c) from MyTable group by a;
    
    
    -- 表名类型变化:MyTable -> MyTable2,该修改属于不兼容修改。
    create table MyTable2 (
      a int,
      b bigint,
      c varchar
    ) with (
      'connector'='datagen'
    );
    
    select a, sum(b), max(c) from MyTable2 group by a;
  • 修改字段且影响下游算子的兼容性,该修改属于不兼容变更。

    -- 原始SQL。
    create table MyTable1 (
      a int,
      b bigint
    ) with (
      'connector'='datagen'
    );
    
    create table MyTable2 (
      c int,
      d bigint
    ) with (
      'connector'='datagen'
    );
    
    select * from MyTable1 join MyTable2 on c = d;
    
    
    -- MyTable2中新增e字段,该修改属于不兼容变更。
    -- join要求其输入的字段不能修改。
    create table MyTable1 (
      a int,
      b bigint
    ) with (
      'connector'='datagen'
    );
    
    create table MyTable2 (
      c int,
      d bigint,
      e varchar
    ) with (
      'connector'='datagen'
    );
    
    -- 原始SQL。
    select * from MyTable1 join MyTable2 on c = d;