All Products
Search
Document Center

Realtime Compute for Apache Flink:Queries

Last Updated:Sep 02, 2024

This topic describes the queries supported by Realtime Compute for Apache Flink.

Realtime Compute for Apache Flink is compatible with the native queries of Apache Flink. The following Backus-Naur Form (BNF) describes a superset of supported streaming and batch SQL queries:

query:
    values
  | WITH withItem [ , withItem ]* query
  | {
        select
      | selectWithoutFrom
      | query UNION [ ALL ] query
      | query EXCEPT query
      | query INTERSECT query
    }
    [ ORDER BY orderItem [, orderItem ]* ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]

withItem:
    name
    [ '(' column [, column ]* ')' ]
    AS '(' query ')'

orderItem:
    expression [ ASC | DESC ]

select:
    SELECT [ ALL | DISTINCT ]
    { * | projectItem [, projectItem ]* }
    FROM tableExpression
    [ WHERE booleanExpression ]
    [ GROUP BY { groupItem [, groupItem ]* } ]
    [ HAVING booleanExpression ]
    [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]

selectWithoutFrom:
    SELECT [ ALL | DISTINCT ]
    { * | projectItem [, projectItem ]* }

projectItem:
    expression [ [ AS ] columnAlias ]
  | tableAlias . *

tableExpression:
    tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
    ON booleanExpression
  | USING '(' column [, column ]* ')'

tableReference:
    tablePrimary
    [ matchRecognize ]
    [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]

tablePrimary:
    [ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
  | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  | [ LATERAL ] '(' query ')'
  | UNNEST '(' expression ')'

tablePath:
    [ [ catalogName . ] databaseName . ] tableName

systemTimePeriod:
    FOR SYSTEM_TIME AS OF dateTimeExpression

dynamicTableOptions:
    /*+ OPTIONS(key=val [, key=val]*) */

key:
    stringLiteral

val:
    stringLiteral

values:
    VALUES expression [, expression ]*

groupItem:
    expression
  | '(' ')'
  | '(' expression [, expression ]* ')'
  | CUBE '(' expression [, expression ]* ')'
  | ROLLUP '(' expression [, expression ]* ')'
  | GROUPING SETS '(' groupItem [, groupItem ]* ')'

windowRef:
    windowName
  | windowSpec

windowSpec:
    [ windowName ]
    '('
    [ ORDER BY orderItem [, orderItem ]* ]
    [ PARTITION BY expression [, expression ]* ]
    [
        RANGE numericOrIntervalExpression {PRECEDING}
      | ROWS numericExpression {PRECEDING}
    ]
    ')'

matchRecognize:
    MATCH_RECOGNIZE '('
    [ PARTITION BY expression [, expression ]* ]
    [ ORDER BY orderItem [, orderItem ]* ]
    [ MEASURES measureColumn [, measureColumn ]* ]
    [ ONE ROW PER MATCH ]
    [ AFTER MATCH
      ( SKIP TO NEXT ROW
      | SKIP PAST LAST ROW
      | SKIP TO FIRST variable
      | SKIP TO LAST variable
      | SKIP TO variable )
    ]
    PATTERN '(' pattern ')'
    [ WITHIN intervalLiteral ]
    DEFINE variable AS condition [, variable AS condition ]*
    ')'

measureColumn:
    expression AS alias

pattern:
    patternTerm [ '|' patternTerm ]*

patternTerm:
    patternFactor [ patternFactor ]*

patternFactor:
    variable [ patternQuantifier ]

patternQuantifier:
    '*'
  | '*?'
  | '+'
  | '+?'
  | '?'
  | '??'
  | '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
  | '{' repeat '}'

Identifiers

Similar to Java, Flink SQL uses the following syntax policies for identifiers, such as table names, column names, and function names:

  • Definitions of identifiers are case-sensitive regardless of whether they are enclosed in backticks (`).

  • Matching of identifiers is case-sensitive.

Unlike Java, Flink SQL allows non-alphanumeric characters in identifiers. Example:

SELECT a AS `my field` FROM t

String constants

String constants must be enclosed in single quotation marks (') rather than double quotation marks ("). Example:

SELECT 'Hello World' 

Duplicate a single quotation mark (') in a string for escaping. Example:

Flink SQL> SELECT 'Hello World', 'It''s me';
+-------------+---------+
|      EXPR$0 |  EXPR$1 |
+-------------+---------+
| Hello World | It's me |
+-------------+---------+
1 row in set

Unicode values are supported in string constants. To include Unicode values in a string constant, use one of the following methods:

  • Use the default escape character by adding a backslash (\) before the Unicode value.

    SELECT U&'\263A' 
  • Use a custom escape character. Example:

    SELECT U&'#263A' UESCAPE '#' -- A number sign (#) is used as the escape character.

The following table describes the queries supported by Apache Flink 1.15 and provides corresponding references.

Note

If you want to view references for another version of Apache Flink, you can switch to the version on the Apache Flink website.

Query

References

Hints

SQL Hints

WITH clause

WITH clause

SELECT and WHERE clauses

SELECT & WHERE clause

SELECT DISTINCT

SELECT DISTINCT

Window functions

Windowing table-valued functions (Windowing TVFs)

Window aggregation

Window Aggregation

Group aggregation

Group Aggregation

Over aggregation

Over Aggregation

Join

Joins

Window join

Window Join

Set operations

Set Operations

ORDER BY clause

ORDER BY clause

LIMIT clause

LIMIT clause

Top-N

Top-N

Window Top-N

Window Top-N

Deduplication

Deduplication

Window deduplication

Window Deduplication

Pattern recognition

Pattern Recognition

Query execution

In streaming mode, the input streams can be categorized into update streams and non-update streams. Non-update streams contain only INSERT-type events, whereas update streams contain other types of events. For example, a change data capture (CDC) source produces an update stream from an external system. Specific operations within Flink, such as group aggregation and Top-N computing, also generate update events. In most cases, operations that generate update events are performed by stateful operators. Stateful operators use state to manage the updates. However, not all stateful operators can consume update streams. For example, the operators for over aggregation and interval join do not support update streams as inputs.

The following table describes the runtime information of each supported query, including the name of the operator, whether the operator is stateful, whether the operator can consume update streams, and whether update events are generated. The information applies to Ververica Runtime (VVR) 6.0.X and later.

Query

Runtime operator

Use state data

Consume update streams

Generate update events

Notes

SELECT and WHERE

Calc

No

Yes

No

N/A

Lookup Join

LookupJoin

No*

Yes

No

For VVR 8.0.1 or later, if you set the table.optimizer.non-deterministic-update.strategy parameter to TRY_RESOLVE and non-deterministic problems are detected in the query, state data is automatically used to resolve the problems. If you want to disable the use of state data, set the table.optimizer.non-deterministic-update.strategy parameter to IGNORE. Note that after you change the value of this parameter, incompatibility may occur. In this case, you must re-execute the query.

Table Function

Correlate

No

Yes

No

N/A

SELECT DISTINCT

GroupAggregate

Yes

Yes

Yes

N/A

Group aggregation

GroupAggregate

LocalGroupAggregate

GlobalGroupAggregate

IncrementalGroupAggregate

Yes*

Yes

Yes

The pre-aggregation operator LocalGroupAggregate does not use state data.

Over aggregation

OverAggregate

Yes

No

No

N/A

Window aggregation

GroupWindowAggregate

WindowAggregate

LocalWindowAggregate

GlobalWindowAggregate

Yes*

Yes*

No*

  • The pre-aggregation operator LocalWindowAggregate does not use state data.

  • Support for consuming update streams varies based on whether you use VVR or Apache Flink. For more information, see the "Comparison of the support for update streams" section of the Window aggregation topic.

  • If the early or late fire feature (experimental) is enabled, update events are generated. Otherwise, no update events are generated.

Join on two streams (Regular join)

Join

Yes

Yes

Yes*

If you use an outer join, such as LEFT JOIN, RIGHT JOIN, or FULL OUTER JOIN, update events are generated.

Interval Join

IntervalJoin

Yes

No

No

N/A

Temporal Join

TemporalJoin

Yes

Yes

No

N/A

Window Join

WindowJoin

Yes

No

No

N/A

Top-N

Rank

Yes

Yes

Yes

Top-N queries do not support ranking based on the processing time. You can use other built-in functions for ranking, such as CURRENT_TIMESTAMP.

Warning

If you specify a processing time field in the ORDER BY clause of a Top-N query, data errors may occur. This issue is not reported during syntax check in VVR 8.0.7 or earlier. We recommend that you use other built-in functions, such as CURRENT_TIMESTAMP.

Window Top-N

WindowRank

Yes

No

No

N/A

Deduplication

Deduplicate

Yes

No

Yes*

If you use the Deduplicate Keep FirstRow policy to deduplicate data based on the processing time (Proctime), no update events are generated.

Window deduplication

WindowDeduplicate

Yes

No

No

N/A

Note

Stateless operators only pass through event types, which ensures that output events are of the same type as input events. Stateless operators do not generate update events regardless of whether the input is an update stream.