全部产品
Search
文档中心

实时计算Flink版:其他限制

更新时间:Jul 10, 2024

本文为您介绍变更SQL除Query、Source和Sink之外的其他限制。

  • 当前不支持检查Flink版本变化后状态的兼容性,所以您需要保证更新后的作业Flink版本和产生Checkpoint或Savepoint对应作业的Flink版本一致。

  • 当前作业的依赖必须和产生Checkpoint或Savepoint对应作业的依赖保持兼容。当前Flink系统暂不无法识别自定义连接器、自定义函数依赖的状态的兼容性变化,因此请您自己保证它们的兼容性。

  • 状态兼容性检测不支持检查同时进行的多个修改。单个修改包括只修改会影响状态计算的Where条件、只修改统计指标(Aggregate Function)、只修改Sink。

    -- 原始SQL。
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    ) WITH (
    'connector' = 'datagen'
    );
    
    CREATE TABLE MySink (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable group by a;
    
    
    -- 修改Sink表:MySink -> MySink2,和修改统计函数:max(c) -> min(c),该修改属于未知兼容修改。
    CREATE TABLE MySink2 (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO MySink2 SELECT a, sum(b), min(c) FROM MyTable group by a;
    
    -- 添加Where条件:a > 10,设置table.optimizer.state-compatibility.ignore-filter: true;并且同时修改统计函数:max(c) -> min(c)。该修改属于未知兼容修改。
    INSERT INTO MySink
    SELECT a, sum(b), min(c) FROM (
      SELECT * FROM MyTable where a > 10
    ) GROUP BY a;
  • 新增全新状态的Query ,该修改属于不兼容修改。

    --原始SQL。
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    ) WITH (
    'connector' = 'datagen'
    );
    
    CREATE TABLE MySink (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO MySink SELECT a, b, c FROM MyTable;
    
    --新增group aggregate query,该修改属于不兼容修改。
    INSERT INTO MySink SELECT a, sum(b), min(c) FROM MyTable GROUP BY a;
  • 如果删除了一路Sink的同时,将Sink或Source的TEMPORARY TABLE DDL也进行了修改或删除,该修改属于未知兼容。如果只是删除了一路Sink,但是TEMPORARY TABLE DDL未更新,该修改属于完全兼容。

    --原始SQL
    --源表1
    CREATE TEMPORARY TABLE MyTable (
      a int,
      b bigint,
      c bigint,
      ts timestamp(3),
      proctime as proctime(),
      watermark for ts AS ts - interval '1' second
    ) WITH ('connector' = 'datagen');
    
    --源表2
    CREATE TEMPORARY TABLE MyTable2 (
      a int,
      b bigint,
      c bigint,
      ts timestamp(3),
      proctime as proctime(),
      watermark for ts AS ts - interval '1' second
    ) WITH ('connector' = 'datagen');
    
    --结果表1
    CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print');
    --结果表2
    CREATE TEMPORARY TABLE MySink2 (a int, b bigint) WITH ('connector'='print');
    
    --Query
    BEGIN STATEMENT SET;  
    INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a;
    INSERT INTO MySink2 SELECT a, b FROM MyTable2 where a > 10;
    END;
    
    --删除了一路Sink的同时,将Sink或Source的TEMPORARY TABLE DDL也进行了修改或删除,该修改属于未知兼容。
    --源表1
    CREATE TEMPORARY TABLE MyTable (
      a int,
      b bigint,
      c bigint,
      d bigint,
      ts timestamp(3),
      proctime as proctime(),
      watermark for ts AS ts - interval '1' second
    ) WITH ('connector' = 'datagen');
    
    --结果表1
    CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print');
    
    --Query
    INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a;
    
    
    --如果只是删除了一路Sink,但是TEMPORARY TABLE DDL未更新,当前修改属于完全兼容。
    --源表1
    CREATE TEMPORARY TABLE MyTable (
      a int,
      b bigint,
      c bigint,
      ts timestamp(3),
      proctime as proctime(),
      watermark for ts AS ts - interval '1' second
    ) WITH ('connector' = 'datagen');
    
    -- 源表2
    CREATE TEMPORARY TABLE MyTable2 (
      a int,
      b bigint,
      c bigint,
      ts timestamp(3),
      proctime as proctime(),
      watermark for ts AS ts - interval '1' second
    ) WITH ('connector' = 'datagen');
    
    --结果表1
    CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print');
    --结果表2
    CREATE TEMPORARY TABLE MySink2 (a int, b bigint) WITH ('connector'='print');
    
    --Query
    INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a;