All Products
Search
Document Center

Realtime Compute for Apache Flink:Window TVF

Last Updated:Aug 14, 2024

This topic describes how the compatibility between a deployment and state data is affected after you modify a window table-valued function (TVF) in an SQL statement for the deployment.

Modifications that do not affect or partially affect the compatibility

  • Add, delete, and modify a non-distinct statistical metric. The statistical metrics are calculated by using aggregate functions.

    • If you add an aggregate function for a statistical metric to the SQL statement for a deployment, the deployment becomes partially compatible with the state data. The value of the added statistical metric is incremented when the deployment starts.

    • If you delete a statistical metric from the SQL statement for a deployment, the deployment remains fully compatible with the state data. The state data of the deleted statistical metric is discarded.

    • If you add a statistical metric to the SQL statement for a deployment and delete a statistical metric from the statement, the deployment becomes partially compatible with the state data. The value of the added statistical metric is incremented when the deployment starts. The state data of the deleted statistical metric is discarded.

    • If you modify a statistical metric in the SQL statement for a deployment, the original statistical metric is deleted and a new statistical metric is added. The deployment becomes partially compatible with the state data. The value of the added statistical metric is incremented when the deployment starts. The state data of the deleted statistical metric is discarded.

    Note

    For statistical metrics that you do not modify, the calculation results of the deployment after the state data of the deployment is reused are the same as the calculation results of the deployment that runs based on historical data.

    -- Original SQL statement: 
    select a, sum(b), max(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
    
    -- Add the statistical metric count(c). After this modification, the deployment becomes partially compatible with the state data. 
    -- The calculation results of sum(b) and max(c) are not affected. The value of count(c) is incremented from 0 when the deployment starts. 
    select a, sum(b), max(c), count(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
    
    -- Delete the statistical metric sum(b). After this modification, the deployment remains fully compatible with the state data. 
    -- The calculation result of max(c) is not affected. 
    select a, max(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
    
    -- Change the statistical metric max(c) to min(c). After this modification, the deployment becomes partially compatible with the state data. 
    -- The calculation result of sum(b) is not affected. max(c) is considered deleted and its state data is discarded. 
    -- min(c) is considered as a new metric, and its value is calculated when the deployment starts. 
    select a, sum(b), min(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
  • Change the position of a non-distinct statistical metric. After this modification, the deployment remains fully compatible with the state data.

    -- Original SQL statement: 
    select a, sum(b), max(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
    
    -- Change the positions of statistical metrics sum(b) and max(c). After this modification, the deployment remains fully compatible with the state data. 
    -- The calculation results of statistical metrics sum(b) and max(c) are not affected. 
    select a, max(c), sum(b)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
  • If the computing logic of a field in a non-distinct statistical metric changes, the statistical metric is considered modified. After this modification, the deployment becomes partially compatible with the state data.

    -- Original SQL statement: 
    select a, sum(b), max(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
    
    -- Change the statistical metric max(c) to max(substring(c,1, 5)). After this modification, the deployment becomes partially compatible with the state data. 
    -- The calculation result of sum(b) is not affected. max(c) is considered deleted and its state data is discarded. 
    -- max(substring(c, 1, 5)) is considered a new statistical metric. The value of max(substring(c, 1, 5)) is incremented when the deployment starts. 
    create temporary view MyView select a, b, substring(c, 1, 5) as c, ts from MyTable;
    select a, sum(b), max(c)
    from table (tumble(table MyView, descriptor(ts), interval '1' minute))
    group by a, window_start, window_end;
  • Add or delete a window attribute field. After this modification, the deployment remains fully compatible with the state data.

    -- Original SQL statement: 
    select a,
      sum(b),
      max(c),
      window_start
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
      group by a, window_start, window_end;
    
    -- Add the window end attribute. After this modification, the deployment remains fully compatible with the state data. 
    select a,
      sum(b),
      max(c),
      window_start,
      window_end
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
      group by a, window_start, window_end;
    
    -- Delete the window start attribute. After this modification, the deployment remains fully compatible with the state data. 
    select a,
      sum(b),
      max(c)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
      group by a, window_start, window_end;
  • If no statistical metrics are included in the SQL statement before and after a deployment is modified, the deployment remains fully compatible with the state data.

  • Change the sequence of group keys. If only the sequence of group keys related to the window function is changed and the sequence of other group keys remains unchanged, the deployment remains fully compatible with the state data.

    -- Original SQL statement: 
    select a, sum(b), max(c)
      from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, c, window_end, window_start;
    
    -- Change the sequence of group keys related to the window function. After this modification, the deployment remains fully compatible with the state data. 
    select a, sum(b), max(c)
      from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, window_end, c, window_start;

Modifications that cause full incompatibility

  • If you modify window-related attributes, such as the window type, window size, and time-related attributes, the deployment becomes incompatible with the state data.

    -- Original SQL statement: 
    select a,
      sum(b),
      max(c),
      window_start
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
      group by a, window_start, window_end;
    
    -- Change the window type from TUMBLE to HOP. After this modification, the deployment becomes incompatible with the state data. 
    select a,
      sum(b),
      max(c),
      hop_start(ts, interval '1' minute, interval '2' minute) as window_start
    from MyTable
      group by a, hop(ts, interval '1' minute, interval '2' minute);
    
    -- Change the window size from '1' minute to '2' minute. After this modification, the deployment becomes incompatible with the state data. 
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '2' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '2' minute);
    
    -- Change the time-related attribute from ts to proctime. After this modification, the deployment becomes incompatible with the state data. 
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(proctime, interval '1' minute);
  • If you add, delete, or modify a group key or change the calculation logic of a field that is used as a group key in the SQL statement for a deployment, the deployment becomes incompatible with the state data.

    -- Original SQL statement: 
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- Add the group key d. After this modification, the deployment becomes incompatible with the state data. 
    select a,
      sum(b),
      max(c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, d, tumble(ts, interval '1' minute);
    
    -- For other examples, see the example of changing a group key in "Aggregate function that is used with GROUP BY."
  • If you add, delete, or modify a distinct statistical metric or change the calculation logic of a field that is related to a distinct statistical metric in the SQL statement for a deployment, the deployment becomes incompatible with the state data. Distinct statistical metrics are calculated by using distinct aggregate functions.

    -- Original SQL statement: 
    select a,
      sum(b),
      count(distinct b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- Add the distinct statistical metric count(distinct b). After this modification, the deployment becomes incompatible with the state data. 
    select a,
      sum(b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- For other examples, see the example of modifying a distinct statistical metric in "Aggregate function that is used with GROUP BY."
  • If you delete all statistical metrics from the SQL statement for a deployment, the deployment becomes incompatible with the state data. The state data of all statistical metrics is discarded and no state data is reused.

    -- Original SQL statement: 
    select a,
      sum(b),
      count(distinct b),
      max(c),
      count(distinct c),
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
    
    -- Delete all statistical metrics, including sum(b) and max(c). After this modification, the deployment becomes incompatible with the state data. 
    select a,
      tumble_start(ts, interval '1' minute) as window_start
    from MyTable
      group by a, tumble(ts, interval '1' minute);
  • Change the sequence of keys of the GROUP BY clause. If the sequence of the group keys other than the keys related to the window function is changed, the deployment becomes incompatible with the state data.

    -- Original SQL statement: 
    select a, sum(b), max(c)
      from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, c, window_end, window_start;
    
    -- Change the sequence of group keys other than the keys related to the window function. After this modification, the deployment becomes incompatible with the state data. 
    select a, sum(b), max(c)
      from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by c, a, window_end, window_start;
  • If you add a statistical metric to the SQL statement for a deployment that does not originally include statistical metrics, the deployment becomes incompatible with the state data.

    -- Original SQL statement: 
    select a, b
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, b, window_end, window_start;
    
    -- Add a new statistical metric. After this modification, the deployment becomes incompatible with the state data. 
    select a, b, count(a)
    from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
    group by a, b, window_end, window_start;
  • If only one statistical metric exists in the SQL statement and the calculation logic of the statistical metric is changed, the deployment becomes incompatible with the state data.

    -- Original SQL statement: 
    insert into MySink select a, sum(b) from
        table(tumble(table MyTable, descriptor(ts), interval '1' second)) 
    group by a, window_start, window_end;
    
    -- Only one statistical metric exists in the SQL statement for a deployment and the calculation logic of the statistical metric is changed. After this modification, the deployment becomes incompatible with the state data. 
    create temporary view MyView as select a, b + 1 as b, ts from MyTable; 
    insert into MySink select a, sum(b) from
        table(tumble(table MyView, descriptor(ts), interval '1' second)) 
    group by a, window_start, window_end;
  • If the statistical metrics before and after a deployment is modified are different, the deployment becomes incompatible with the state data.

    -- Original SQL statement:
    insert into MySink select a, sum(b) from
        table(tumble(table MyTable, descriptor(ts), interval '1' second)) 
    group by a, c, window_start, window_end;
    
    -- The statistical metrics before and after a deployment is modified are different. After this modification, the deployment becomes incompatible with the state data. 
    insert into MySink select a, min(b) from
        table(tumble(table MyTable, descriptor(ts), interval '1' second)) 
    group by a, c, window_start, window_end;
  • Add or delete a clause for the calculation of the window_start, window_end, and window_time fields between the window function and the GROUP BY clause.

    • If you add a clause for field calculation to the SQL statement for a deployment, the deployment becomes incompatible with the state data.

    • If you delete a clause for field calculation from the SQL statement for a deployment, the deployment becomes incompatible with the state data.

      -- Original SQL statement: 
      select a, sum(b), max(c), window_start,window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start, window_end;
      
      -- Add a clause for field calculation. 
      select a, sum(b), max(c), window_start,window_end
      from (select a, b, c, window_start + (INTERVAL '1' SECOND) as window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start, window_end;
  • The GROUP BY clause does not contain both the window_start and window_end fields.

    • If the GROUP BY clause of a deployment contains both the window_start and window_end fields before a modification and does not contain both the fields after the modification, the deployment becomes incompatible with the state data.

    • If the GROUP BY clause of a deployment does not contain both the window_start and window_end fields before a modification and contains both the fields after the modification, the deployment becomes incompatible with the state data.

      -- Original SQL statement: 
      select a, sum(b), max(c), window_start
      from (select a, b, c, window_start  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start;
      
      -- The GROUP BY clause contains both the window_start and window_end fields.
      select a, sum(b), max(c), window_start,window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start, window_end;
  • GROUPING SETS, CUBE, or ROLLUP is used in the GROUP BY clause. In this case, data is separately grouped by the window_start and the window_end fields.

    • If GROUPING SETS, CUBE, or ROLLUP is used in the GROUP BY clause of a deployment after a modification, the deployment becomes incompatible with the state data.

    • If GROUPING SETS, CUBE, or ROLLUP is deleted from the GROUP BY clause of a deployment after a modification, the deployment becomes incompatible with the state data.

      -- Original SQL statement: 
      select a, sum(b), max(c), window_start, window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start, window_end;
      
      -- Add GROUPING SETS to the SQL statement. 
      select a, sum(b), max(c), window_start, window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by GROUPING sets((a), (window_start), (window_end));

Modifications that cause unknown compatibility

  • Add or delete a clause for the calculation of the window_start, window_end, and window_time fields between the window function and the GROUP BY clause.

    If the clause for field calculation exists in the SQL statement for a deployment before and after a modification, the compatibility between the deployment and the state data is unknown.

  • Filter the window_start, window_end, and window_time fields between the window function and the GROUP BY clause.

    • If a filter clause is added to the SQL statement for a deployment, the compatibility between the deployment and the state data is unknown.

    • If a filter clause is deleted from the SQL statement for a deployment, the compatibility between the deployment and the state data is unknown.

    • If a filter clause exists in the SQL statement for a deployment before and after a modification, the compatibility between the deployment and the state data is unknown.

      -- Original SQL statement 
      select a, sum(b), max(c), window_start,window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start, window_end;
      
      -- Add a filter clause to the SQL statement. 
      select a, sum(b), max(c), window_start,window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
          WHERE window_start >= TIMESTAMP '2024-04-15 08:06:00.000')
      group by a, window_start, window_end;
  • The window function is used together with a user-defined table function (UDTF).

    • If a UDTF is added to the SQL statement for a deployment after a modification, the compatibility between the deployment and the state data is unknown.

    • If a UDTF is deleted from the SQL statement for a deployment after a modification, the compatibility between the deployment and the state data is unknown.

    • If a UDTF exists in the SQL statement for a deployment before and after a modification, the compatibility between the deployment and the state data is unknown.

      -- Original SQL statement: 
      select a, sum(b), length(c), window_start,window_end from (
        select a, b, c, window_start,window_end from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a,c, window_start,window_end;
      
      -- Add a UDTF. 
      select a, sum(b), length(c), window_start, window_end, c1, c2
      from (select a, b, c, window_start, window_end, c1, c2  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)), LATERAL TABLE(split(c)) as T(c1, c2))
      group by a,c, window_start, window_end, c1, c2;
  • A Python user-defined aggregate function (UDAF) is used.

    • If a Python UDAF is added to the SQL statement for a deployment after a modification, the compatibility between the deployment and the state data is unknown.

    • If a Python UDAF is deleted from the SQL statement for a deployment after a modification, the compatibility between the deployment and the state data is unknown.

    • If a Python UDAF exists in the SQL statement for a deployment before and after a modification, the compatibility between the deployment and the state data is unknown.

      -- Original SQL statement: 
      select a, sum(b), max(c), window_start
      from (select a, b, c, window_start  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start;
      
      -- Add a Python UDAF. 
      select a, sum(b), c, window_start
      from (select a, b, weighted_avg(c) as c, window_start from table (tumble(table MyTable, descriptor(ts), interval '1' minute))
      GROUP by a,b,window_start)
      group by a,c, window_start;
  • Perform aggregation on the window_start, window_end, and window_time fields by using an aggregate function.

    • If the aggregation is added in the SQL statement for a deployment after a modification, the compatibility between the deployment and the state data is unknown.

    • If the aggregation is deleted from the SQL statement for a deployment after a modification, the compatibility between the deployment and the state data is unknown.

    • If the aggregation exists in the SQL statement for a deployment before and after a modification, the compatibility between the deployment and the state data is unknown.

      -- Original SQL statement: 
      select a, sum(b), max(c), window_start, window_end
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by a, window_start, window_end;
      
      - Add an aggregate function for data calculation. 
      select a, sum(b), max(c), MAX(window_start) as ag, window_end 
      from (select a, b, c, window_start, window_end  from table (tumble(table MyTable, descriptor(ts), interval '1' minute)))
      group by (a, window_start, window_end);
      
  • GROUPING SETS, CUBE, or ROLLUP is used in the GROUP BY clause. In this case, data is separately grouped by the window_start and the window_end fields.

    If GROUPING SETS, CUBE, or ROLLUP is used in the GROUP BY clause of a deployment before and after a modification, the compatibility between the deployment and the state data is unknown.