本文為您介紹Sink變更的可相容性和不可相容性詳情。
可相容的變更
刪除多路Sink的某路Sink,該變更屬於完全相容變更。
-- 原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink1 ( a int, b bigint, c varchar ); CREATE TABLE MySink2 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10; -- 刪除MySink1對應的Query,該變更屬於完全相容變更。 -- 該Query中的group aggregate對應的狀態會被丟棄。 INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10; -- 刪除MySink2對應的Query,該變更屬於完全相容變更。 INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
新增Sink且不帶有狀態的Query,該變更屬於完全相容變更。
-- 原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink1 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- 新增無狀態的Query,該變更屬於完全相容變更。 CREATE TABLE MySink2 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;
預設情況下,Sink被認為是無狀態運算元(大部分的Sink連接器沒有狀態)。因此變更Sink表名、連接器類型、WITH屬性都是相容變更。
-- 原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- 修改表名、連接器類型等,該變更屬於完全相容變更。 CREATE TABLE MySink2 ( a int, b bigint, c varchar ) WITH ( 'connector' = 'kafka', ... ); INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
不相容的變更
新增Sink且帶有狀態的Query,該變更屬於不相容變更。
-- 原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink1 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- 新增有狀態的Query,該變更屬於不相容變更。 CREATE TABLE MySink2 ( b bigint, a int, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT b, sum(a), max(c) FROM MyTable GROUP BY b;
設定table.optimizer.state-compatibility.ignore-sink=false,將Sink視為有狀態運算元,修改表名、連接器類型,此變更屬於不相容變更。
-- 原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- 設定table.optimizer.state-compatibility.ignore-sink=false, -- 將Sink視為有狀態運算元,修改表名,該變更屬於不相容變更 CREATE TABLE MySink2 ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- 設定table.optimizer.state-compatibility.ignore-sink=false, -- 將Sink視為有狀態運算元,修改連接器類型(例如將print改為blackhole),該變更屬於不相容變更 create table MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'blackhole', ... ); INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
刪除了一路sink的同時,將Sink或Source的TEMPORARY TABLE DDL語句也進行了修改或刪除,屬於不相容變更。詳情請參見其他限制。
一般情況下Sink是無狀態的運算元(預設table.optimizer.state-compatibility.ignore-sink為true來忽略Sink)。但在有資料亂序風險時,架構將基於Sink產生有狀態的SinkMaterializer節點來消除亂序、保障資料正確性,該節點為有狀態節點,詳情請參見Flink SQL中Changelog事件亂序處理原理。因此變更後可能出現作業啟動時相容性檢測顯示完全相容,但實際可能不相容的情況。例如更換Source的Primary Key導致Sink上遊Upsert Key發生變化。
--原始query CREATE TEMPORARY TABLE MyTable ( a int primary key not enforced, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); CREATE TEMPORARY TABLE MySink (a int, b bigint, c bigint primary key not enforced) with ('connector'='print'); INSERT INTO MySink SELECT a, b, c FROM MyTable; --修改主鍵所屬的變數 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint primary key not enforced, d bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); CREATE TEMPORARY TABLE MySink (a int, b bigint primary key not enforced, c bigint) with ('connector'='print'); INSERT INTO MySink SELECT a, b, c FROM MyTable;
參數table.optimizer.state-compatibility.ignore-sink由true變更為false(將Sink視為有狀態運算元,納入狀態相容檢測範圍),屬於不相容變更。