本文為您介紹變更SQL除Query、Source和Sink之外的其他限制。
當前不支援檢查Flink版本變化後狀態的相容性,所以您需要保證更新後的作業Flink版本和產生Checkpoint或Savepoint對應作業的Flink版本一致。
當前作業的依賴必須和產生Checkpoint或Savepoint對應作業的依賴保持相容。當前Flink系統暫不無法識別自訂連接器、自訂函數依賴的狀態的相容性變化,因此請您自己保證它們的相容性。
狀態相容性檢測不支援檢查同時進行的多個修改。單個修改包括只修改會影響狀態計算的Where條件、只修改統計指標(Aggregate Function)、只修改Sink。
-- 原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ) WITH ( 'connector' = 'datagen' ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable group by a; -- 修改Sink表:MySink -> MySink2,和修改統計函數:max(c) -> min(c),該修改屬於未知相容修改。 CREATE TABLE MySink2 ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink2 SELECT a, sum(b), min(c) FROM MyTable group by a; -- 添加Where條件:a > 10,設定table.optimizer.state-compatibility.ignore-filter: true;並且同時修改統計函數:max(c) -> min(c)。該修改屬於未知相容修改。 INSERT INTO MySink SELECT a, sum(b), min(c) FROM ( SELECT * FROM MyTable where a > 10 ) GROUP BY a;
新增全新狀態的Query ,該修改屬於不相容修改。
--原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ) WITH ( 'connector' = 'datagen' ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink SELECT a, b, c FROM MyTable; --新增group aggregate query,該修改屬於不相容修改。 INSERT INTO MySink SELECT a, sum(b), min(c) FROM MyTable GROUP BY a;
如果刪除了一路Sink的同時,將Sink或Source的TEMPORARY TABLE DDL也進行了修改或刪除,該修改屬於未知相容。如果只是刪除了一路Sink,但是TEMPORARY TABLE DDL未更新,該修改屬於完全相容。
--原始SQL --源表1 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); --源表2 CREATE TEMPORARY TABLE MyTable2 ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); --結果表1 CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print'); --結果表2 CREATE TEMPORARY TABLE MySink2 (a int, b bigint) WITH ('connector'='print'); --Query BEGIN STATEMENT SET; INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT a, b FROM MyTable2 where a > 10; END; --刪除了一路Sink的同時,將Sink或Source的TEMPORARY TABLE DDL也進行了修改或刪除,該修改屬於未知相容。 --源表1 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint, d bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); --結果表1 CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print'); --Query INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a; --如果只是刪除了一路Sink,但是TEMPORARY TABLE DDL未更新,當前修改屬於完全相容。 --源表1 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); -- 源表2 CREATE TEMPORARY TABLE MyTable2 ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); --結果表1 CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print'); --結果表2 CREATE TEMPORARY TABLE MySink2 (a int, b bigint) WITH ('connector'='print'); --Query INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a;