本文为您介绍变更SQL除Query、Source和Sink之外的其他限制。
当前不支持检查Flink版本变化后状态的兼容性,所以您需要保证更新后的作业Flink版本和产生Checkpoint或Savepoint对应作业的Flink版本一致。
当前作业的依赖必须和产生Checkpoint或Savepoint对应作业的依赖保持兼容。当前Flink系统暂不无法识别自定义连接器、自定义函数依赖的状态的兼容性变化,因此请您自己保证它们的兼容性。
状态兼容性检测不支持检查同时进行的多个修改。单个修改包括只修改会影响状态计算的Where条件、只修改统计指标(Aggregate Function)、只修改Sink。
-- 原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ) WITH ( 'connector' = 'datagen' ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable group by a; -- 修改Sink表:MySink -> MySink2,和修改统计函数:max(c) -> min(c),该修改属于未知兼容修改。 CREATE TABLE MySink2 ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink2 SELECT a, sum(b), min(c) FROM MyTable group by a; -- 添加Where条件:a > 10,设置table.optimizer.state-compatibility.ignore-filter: true;并且同时修改统计函数:max(c) -> min(c)。该修改属于未知兼容修改。 INSERT INTO MySink SELECT a, sum(b), min(c) FROM ( SELECT * FROM MyTable where a > 10 ) GROUP BY a;
新增全新状态的Query ,该修改属于不兼容修改。
--原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ) WITH ( 'connector' = 'datagen' ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink SELECT a, b, c FROM MyTable; --新增group aggregate query,该修改属于不兼容修改。 INSERT INTO MySink SELECT a, sum(b), min(c) FROM MyTable GROUP BY a;
如果删除了一路Sink的同时,将Sink或Source的TEMPORARY TABLE DDL也进行了修改或删除,该修改属于未知兼容。如果只是删除了一路Sink,但是TEMPORARY TABLE DDL未更新,该修改属于完全兼容。
--原始SQL --源表1 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); --源表2 CREATE TEMPORARY TABLE MyTable2 ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); --结果表1 CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print'); --结果表2 CREATE TEMPORARY TABLE MySink2 (a int, b bigint) WITH ('connector'='print'); --Query BEGIN STATEMENT SET; INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT a, b FROM MyTable2 where a > 10; END; --删除了一路Sink的同时,将Sink或Source的TEMPORARY TABLE DDL也进行了修改或删除,该修改属于未知兼容。 --源表1 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint, d bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); --结果表1 CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print'); --Query INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a; --如果只是删除了一路Sink,但是TEMPORARY TABLE DDL未更新,当前修改属于完全兼容。 --源表1 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); -- 源表2 CREATE TEMPORARY TABLE MyTable2 ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); --结果表1 CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print'); --结果表2 CREATE TEMPORARY TABLE MySink2 (a int, b bigint) WITH ('connector'='print'); --Query INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a;