本文为您介绍Sink变更的可兼容性和不可兼容性详情。
可兼容的变更
删除多路Sink的某路Sink,该变更属于完全兼容变更。
-- 原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink1 ( a int, b bigint, c varchar ); CREATE TABLE MySink2 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10; -- 删除MySink1对应的Query,该变更属于完全兼容变更。 -- 该Query中的group aggregate对应的状态会被丢弃。 INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10; -- 删除MySink2对应的Query,该变更属于完全兼容变更。 INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
新增Sink且不带有状态的Query,该变更属于完全兼容变更。
-- 原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink1 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- 新增无状态的Query,该变更属于完全兼容变更。 CREATE TABLE MySink2 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;
默认情况下,Sink被认为是无状态算子(大部分的Sink连接器没有状态)。因此变更Sink表名、连接器类型、WITH属性都是兼容变更。
-- 原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- 修改表名、连接器类型等,该变更属于完全兼容变更。 CREATE TABLE MySink2 ( a int, b bigint, c varchar ) WITH ( 'connector' = 'kafka', ... ); INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
不兼容的变更
新增Sink且带有状态的Query,该变更属于不兼容变更。
-- 原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink1 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- 新增有状态的Query,该变更属于不兼容变更。 CREATE TABLE MySink2 ( b bigint, a int, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT b, sum(a), max(c) FROM MyTable GROUP BY b;
设置table.optimizer.state-compatibility.ignore-sink=false,将Sink视为有状态算子,修改表名、连接器类型,此变更属于不兼容变更。
-- 原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- 设置table.optimizer.state-compatibility.ignore-sink=false, -- 将Sink视为有状态算子,修改表名,该变更属于不兼容变更 CREATE TABLE MySink2 ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- 设置table.optimizer.state-compatibility.ignore-sink=false, -- 将Sink视为有状态算子,修改连接器类型(例如将print改为blackhole),该变更属于不兼容变更 create table MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'blackhole', ... ); INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
删除了一路sink的同时,将Sink或Source的TEMPORARY TABLE DDL语句也进行了修改或删除,属于不兼容变更。详情请参见其他限制。
一般情况下Sink是无状态的算子(默认table.optimizer.state-compatibility.ignore-sink为true来忽略Sink)。但在有数据乱序风险时,框架将基于Sink生成有状态的SinkMaterializer节点来消除乱序、保障数据正确性,该节点为有状态节点,详情请参见Flink SQL中Changelog事件乱序处理原理。因此变更后可能出现作业启动时兼容性检测显示完全兼容,但实际可能不兼容的情况。例如更换Source的Primary Key导致Sink上游Upsert Key发生变化。
--原始query CREATE TEMPORARY TABLE MyTable ( a int primary key not enforced, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); CREATE TEMPORARY TABLE MySink (a int, b bigint, c bigint primary key not enforced) with ('connector'='print'); INSERT INTO MySink SELECT a, b, c FROM MyTable; --修改主键所属的变量 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint primary key not enforced, d bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); CREATE TEMPORARY TABLE MySink (a int, b bigint primary key not enforced, c bigint) with ('connector'='print'); INSERT INTO MySink SELECT a, b, c FROM MyTable;
参数table.optimizer.state-compatibility.ignore-sink由true变更为false(将Sink视为有状态算子,纳入状态兼容检测范围),属于不兼容变更。