All Products
Search
Document Center

Realtime Compute for Apache Flink:Modify a source table

Last Updated:Aug 12, 2024

This topic describes the compatibility between a job and the state data used to start the job after you modify a source table defined in the SQL statements for the job.

Background information

If you modify the schema defined in the DDL statement that is used to create a source table, the system determines the compatibility based on the modifications to the entire query that contains the table. If you modify the WITH clause, the system detects only changes to the connector parameter during a compatibility check.

Compatible modifications

  • Fully or partially compatible: Modify a field without affecting the state compatibility of downstream operators.

    -- Original SQL statement: 
    create table MyTable (
      a int,
      b bigint,
      c varchar
    ) with (
      'connector'='datagen'
    );
    
    select a, sum(b), max(c) from MyTable group by a;
    
    
    -- Adding the d field is a fully compatible change. 
    -- This is because the d field is not used in the SELECT statement. 
    create table MyTable (
      a int,
      b bigint,
      c varchar,
      d int
    ) with (
      'connector'='datagen'
    );
    
    select a, sum(b), max(c) from MyTable group by a;
    
    
    -- Changing the b field to b as d + 1 is a partially compatible modification. 
    -- The calculation results of max(c) are not affected. The original sum(b) field is considered deleted because column b is modified. 
    -- The state data of the original sum(b) field is discarded. The new sum(b) field is considered added, and the field value is incremented when the job starts. 
    create table MyTable (
      a int,
      d bigint,
      c varchar,
      b as d + 1
    ) with (
      'connector'='datagen'
    );
    
    select a, sum(b), max(c) from MyTable group by a;
  • Fully compatible: Modify the watermark interval.

  • If you modify the primary keys, the state compatibility of downstream operators may be affected. For example, modifying the upsert key may cause compatibility issues.

  • If you modify the primary keys or the columns used for sharding in the WITH clause, the state compatibility of the source table may be affected. For example, if you modify the scan.incremental.snapshot.chunk.key-column parameter for the MySQL connector, the state data of the source table for the full scan phase may not be available.

  • If the types of events sent to downstream operators may change because the changelog mode changes, the state compatibility of downstream operators may be affected. Example of event types: DELETE, UPDATE_BEFORE, and UPDATE_AFTER. Example of modifications that may change the event types: (1) Modify the cdcMode parameter in the WITH clause for the Hologres connector. The parameter specifies whether to use Change Data Capture (CDC) to read binary logs. (2) Modify whether to read binary logs in UPSERT mode for the MongoDB or PostgreSQL connector.

Incompatible modifications

  • Modify the connector type.

    -- Original SQL statement: 
    create table MyTable (
      a int,
      b bigint,
      c varchar
    ) with (
      'connector'='datagen'
    );
    
    select a, sum(b), max(c) from MyTable group by a;
    
    
    -- Incompatible: Change the connector type from datagen to kafka. 
    create table MyTable (
      a int,
      b bigint,
      c varchar
    ) with (
      'connector'='kafka', 
      ...
    );
    
    select a, sum(b), max(c) from MyTable group by a;
  • Modify the table name.

    -- Original SQL statement: 
    create table MyTable (
      a int,
      b bigint,
      c varchar
    ) with (
      'connector'='datagen'
    );
    select a, sum(b), max(c) from MyTable group by a;
    
    
    -- Incompatible: Change the table name from MyTable to MyTable2. 
    create table MyTable2 (
      a int,
      b bigint,
      c varchar
    ) with (
      'connector'='datagen'
    );
    
    select a, sum(b), max(c) from MyTable2 group by a;
  • Modify a field, which affects the state compatibility of downstream operators.

    -- Original SQL statement: 
    create table MyTable1 (
      a int,
      b bigint
    ) with (
      'connector'='datagen'
    );
    
    create table MyTable2 (
      c int,
      d bigint
    ) with (
      'connector'='datagen'
    );
    
    select * from MyTable1 join MyTable2 on c = d;
    
    
    -- Incompatible: Add the e field to the MyTable2 table. 
    -- This leads to a change in the input fields of the join operation. 
    create table MyTable1 (
      a int,
      b bigint
    ) with (
      'connector'='datagen'
    );
    
    create table MyTable2 (
      c int,
      d bigint,
      e varchar
    ) with (
      'connector'='datagen'
    );
    
    -- Original SQL statement: 
    select * from MyTable1 join MyTable2 on c = d;