This topic describes how the compatibility between a deployment and the state data is affected after you change a sink connector in an SQL statement for the deployment.
Changes that do not affect or partially affect the compatibility
Delete one of the sink connectors that are connected to the same source connector. After this change, the deployment remains fully compatible with the state data.
-- Original SQL statement: CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink1 ( a int, b bigint, c varchar ); CREATE TABLE MySink2 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10; -- Delete the SELECT statement for MySink1. After this change, the deployment remains fully compatible with the state data. -- The state data that corresponds to the aggregate functions in the SELECT statement is discarded. INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10; -- Delete the SELECT statement for MySink2. After this change, the deployment remains fully compatible with the state data. INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
Add a sink connector and a stateless query. After this change, the deployment remains fully compatible with the state data.
-- Original SQL statement: CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink1 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- Add a stateless query. After this change, the deployment remains fully compatible with the state data. CREATE TABLE MySink2 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;
Most sink connectors are stateless. Therefore, a sink is considered a stateless operator by default. If the table name, connector type, and attributes in the WITH clause for a deployment are changed, the deployment remains fully compatible with the state data.
-- Original SQL statement: CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- Change configurations such as table name and connector type in the SQL statement for a deployment. After this change, the deployment remains fully compatible with the state data. CREATE TABLE MySink2 ( a int, b bigint, c varchar ) WITH ( 'connector' = 'kafka', ... ); INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
Changes that cause incompatibility
Add a sink connector and a stateful query. After this change, the deployment becomes incompatible with the state data.
-- Original SQL statement: CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink1 ( a int, b bigint, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- Add a stateful query. After this change, the deployment becomes incompatible with the state data. CREATE TABLE MySink2 ( b bigint, a int, c varchar ); INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT b, sum(a), max(c) FROM MyTable GROUP BY b;
Set the table.optimizer.state-compatibility.ignore-sink parameter for a sink to false and change the table name or connector type for a deployment. After this change, the deployment becomes incompatible with the state data. After you set the table.optimizer.state-compatibility.ignore-sink parameter to false, the sink is considered a stateful operator.
-- Original SQL statement: CREATE TABLE MyTable ( a int, b bigint, c varchar ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- Set the table.optimizer.state-compatibility.ignore-sink parameter for a sink to false. -- After you set the table.optimizer.state-compatibility.ignore-sink parameter to false, the sink is considered a stateful operator. Change the table name. After this change, the deployment becomes incompatible with the state data. CREATE TABLE MySink2 ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a; -- Set the table.optimizer.state-compatibility.ignore-sink parameter for a sink to false. -- After you set the table.optimizer.state-compatibility.ignore-sink parameter to false, the sink is considered a stateful operator. Change the connector type. For example, you can change the connector type from print to blackhole. After this change, the deployment becomes incompatible with the state data. create table MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'blackhole', ... ); INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
If you delete a sink and modify or delete the TEMPORARY TABLE of a sink or source for a deployment, the deployment becomes incompatible with the state data. For more information, see Other limits.
By default, the table.optimizer.state-compatibility.ignore-sink parameter for a sink is set to true to ignore the sink during the compatibility check. Therefore, a sink is considered a stateless operator in most cases. If out-of-order data exists, a stateful SinkMaterializer operator is generated based on the sink to handle out-of-order data and ensure data accuracy. For more information, see Handle out-of-order changelog events in Flink SQL. In this case, if the result of the compatibility check that you performed when you start a deployment is Full Compatible, the deployment may become incompatible with the state data after a change. For example, if you change the primary key of a source, the upstream upsert key of the sink changes. In this case, the preceding compatibility issue may occur.
-- Original query CREATE TEMPORARY TABLE MyTable ( a int primary key not enforced, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); CREATE TEMPORARY TABLE MySink (a int, b bigint, c bigint primary key not enforced) with ('connector'='print'); INSERT INTO MySink SELECT a, b, c FROM MyTable; -- Change the column that is used as the primary key. CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint primary key not enforced, d bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); CREATE TEMPORARY TABLE MySink (a int, b bigint primary key not enforced, c bigint) with ('connector'='print'); INSERT INTO MySink SELECT a, b, c FROM MyTable;
Set the table.optimizer.state-compatibility.ignore-sink parameter for a sink from true to false. The sink is changed from a stateless operator to a stateful operator and is involved in the compatibility check. After this change, the deployment becomes incompatible with the state data.