This topic describes the limits of modifying the SQL statements for a job before you start the job based on a checkpoint or savepoint, excluding the limits of modifying a query, a source table, and a sink table for the job.
The job you want to start must use the same Apache Flink version as the job for which the checkpoint or savepoint is generated. Otherwise, the system cannot determine the compatibility.
The dependencies of the job you want to start must be compatible with the dependencies of the job for which the checkpoint or savepoint is generated. If you modify custom connectors or the dependencies of user-defined functions (UDFs), you must ensure compatibility after the modification because the system cannot identify compatibility issues that may occur in this case.
If you modify multiple items before a compatibility check, the system cannot determine the compatibility. An item refers to an aggregate function, a sink table, or a WHERE clause that affects stateful computation.
-- Original SQL statements: CREATE TABLE MyTable ( a int, b bigint, c varchar ) WITH ( 'connector' = 'datagen' ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable group by a; -- -- Example of changes that result in unknown compatibility: Rename the sink table as MySink2 and change the aggregate function from max(c) to min(c). CREATE TABLE MySink2 ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink2 SELECT a, sum(b), min(c) FROM MyTable group by a; -- -- Example of changes that result in unknown compatibility: Add a WHERE clause that contains the a > 10 condition, set the table.optimizer.state-compatibility.ignore-filter parameter to true, and change the aggregate function from max(c) to min(c). INSERT INTO MySink SELECT a, sum(b), min(c) FROM ( SELECT * FROM MyTable where a > 10 ) GROUP BY a;
If you add a query that generates new state data, a compatibility issue occurs.
-- Original SQL statements: CREATE TABLE MyTable ( a int, b bigint, c varchar ) WITH ( 'connector' = 'datagen' ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink SELECT a, b, c FROM MyTable; -- Adding a group aggregate query results in an incompatible change. INSERT INTO MySink SELECT a, sum(b), min(c) FROM MyTable GROUP BY a;
If you delete a sink table and modify or delete the TEMPORARY TABLE statement for the sink table or the corresponding source table, the system cannot determine the compatibility before you start the job. If you delete a sink table without modifying the corresponding TEMPORARY TABLE statement, compatibility is not affected.
-- Original SQL statements -- Source table 1 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); -- Source table 2 CREATE TEMPORARY TABLE MyTable2 ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); -- Sink table 1 CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print'); -- Sink table 2 CREATE TEMPORARY TABLE MySink2 (a int, b bigint) WITH ('connector'='print'); --Query BEGIN STATEMENT SET; INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT a, b FROM MyTable2 where a > 10; END; -- Example of a change that results in unknown compatibility: Delete a sink table and modify or delete the TEMPORARY TABLE statement for the sink table or the corresponding source table. -- Source table 1 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint, d bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); -- Sink table 1 CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print'); --Query INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a; -- Example of a fully compatible change: Delete a sink table without modifying the corresponding TEMPORARY TABLE statement. -- Source table 1 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); -- Source table 2 CREATE TEMPORARY TABLE MyTable2 ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); -- Sink table 1 CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print'); -- Sink table 2 CREATE TEMPORARY TABLE MySink2 (a int, b bigint) WITH ('connector'='print'); --Query INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a;