全部產品
Search
文件中心

Realtime Compute for Apache Flink:其他限制

更新時間:Jul 13, 2024

本文為您介紹變更SQL除Query、Source和Sink之外的其他限制。

  • 當前不支援檢查Flink版本變化後狀態的相容性,所以您需要保證更新後的作業Flink版本和產生Checkpoint或Savepoint對應作業的Flink版本一致。

  • 當前作業的依賴必須和產生Checkpoint或Savepoint對應作業的依賴保持相容。當前Flink系統暫不無法識別自訂連接器、自訂函數依賴的狀態的相容性變化,因此請您自己保證它們的相容性。

  • 狀態相容性檢測不支援檢查同時進行的多個修改。單個修改包括只修改會影響狀態計算的Where條件、只修改統計指標(Aggregate Function)、只修改Sink。

    -- 原始SQL。
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    ) WITH (
    'connector' = 'datagen'
    );
    
    CREATE TABLE MySink (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable group by a;
    
    
    -- 修改Sink表:MySink -> MySink2,和修改統計函數:max(c) -> min(c),該修改屬於未知相容修改。
    CREATE TABLE MySink2 (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO MySink2 SELECT a, sum(b), min(c) FROM MyTable group by a;
    
    -- 添加Where條件:a > 10,設定table.optimizer.state-compatibility.ignore-filter: true;並且同時修改統計函數:max(c) -> min(c)。該修改屬於未知相容修改。
    INSERT INTO MySink
    SELECT a, sum(b), min(c) FROM (
      SELECT * FROM MyTable where a > 10
    ) GROUP BY a;
  • 新增全新狀態的Query ,該修改屬於不相容修改。

    --原始SQL。
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    ) WITH (
    'connector' = 'datagen'
    );
    
    CREATE TABLE MySink (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO MySink SELECT a, b, c FROM MyTable;
    
    --新增group aggregate query,該修改屬於不相容修改。
    INSERT INTO MySink SELECT a, sum(b), min(c) FROM MyTable GROUP BY a;
  • 如果刪除了一路Sink的同時,將Sink或Source的TEMPORARY TABLE DDL也進行了修改或刪除,該修改屬於未知相容。如果只是刪除了一路Sink,但是TEMPORARY TABLE DDL未更新,該修改屬於完全相容。

    --原始SQL
    --源表1
    CREATE TEMPORARY TABLE MyTable (
      a int,
      b bigint,
      c bigint,
      ts timestamp(3),
      proctime as proctime(),
      watermark for ts AS ts - interval '1' second
    ) WITH ('connector' = 'datagen');
    
    --源表2
    CREATE TEMPORARY TABLE MyTable2 (
      a int,
      b bigint,
      c bigint,
      ts timestamp(3),
      proctime as proctime(),
      watermark for ts AS ts - interval '1' second
    ) WITH ('connector' = 'datagen');
    
    --結果表1
    CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print');
    --結果表2
    CREATE TEMPORARY TABLE MySink2 (a int, b bigint) WITH ('connector'='print');
    
    --Query
    BEGIN STATEMENT SET;  
    INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a;
    INSERT INTO MySink2 SELECT a, b FROM MyTable2 where a > 10;
    END;
    
    --刪除了一路Sink的同時,將Sink或Source的TEMPORARY TABLE DDL也進行了修改或刪除,該修改屬於未知相容。
    --源表1
    CREATE TEMPORARY TABLE MyTable (
      a int,
      b bigint,
      c bigint,
      d bigint,
      ts timestamp(3),
      proctime as proctime(),
      watermark for ts AS ts - interval '1' second
    ) WITH ('connector' = 'datagen');
    
    --結果表1
    CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print');
    
    --Query
    INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a;
    
    
    --如果只是刪除了一路Sink,但是TEMPORARY TABLE DDL未更新,當前修改屬於完全相容。
    --源表1
    CREATE TEMPORARY TABLE MyTable (
      a int,
      b bigint,
      c bigint,
      ts timestamp(3),
      proctime as proctime(),
      watermark for ts AS ts - interval '1' second
    ) WITH ('connector' = 'datagen');
    
    -- 源表2
    CREATE TEMPORARY TABLE MyTable2 (
      a int,
      b bigint,
      c bigint,
      ts timestamp(3),
      proctime as proctime(),
      watermark for ts AS ts - interval '1' second
    ) WITH ('connector' = 'datagen');
    
    --結果表1
    CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print');
    --結果表2
    CREATE TEMPORARY TABLE MySink2 (a int, b bigint) WITH ('connector'='print');
    
    --Query
    INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a;