This topic describes the queries supported by Realtime Compute for Apache Flink.
Realtime Compute for Apache Flink is compatible with the native queries of Apache Flink. The following Backus-Naur Form (BNF) describes a superset of supported streaming and batch SQL queries:
query:
values
| WITH withItem [ , withItem ]* query
| {
select
| selectWithoutFrom
| query UNION [ ALL ] query
| query EXCEPT query
| query INTERSECT query
}
[ ORDER BY orderItem [, orderItem ]* ]
[ LIMIT { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
withItem:
name
[ '(' column [, column ]* ')' ]
AS '(' query ')'
orderItem:
expression [ ASC | DESC ]
select:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *
tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:
tablePrimary
[ matchRecognize ]
[ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
tablePrimary:
[ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
| [ LATERAL ] '(' query ')'
| UNNEST '(' expression ')'
tablePath:
[ [ catalogName . ] databaseName . ] tableName
systemTimePeriod:
FOR SYSTEM_TIME AS OF dateTimeExpression
dynamicTableOptions:
/*+ OPTIONS(key=val [, key=val]*) */
key:
stringLiteral
val:
stringLiteral
values:
VALUES expression [, expression ]*
groupItem:
expression
| '(' ')'
| '(' expression [, expression ]* ')'
| CUBE '(' expression [, expression ]* ')'
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
windowRef:
windowName
| windowSpec
windowSpec:
[ windowName ]
'('
[ ORDER BY orderItem [, orderItem ]* ]
[ PARTITION BY expression [, expression ]* ]
[
RANGE numericOrIntervalExpression {PRECEDING}
| ROWS numericExpression {PRECEDING}
]
')'
matchRecognize:
MATCH_RECOGNIZE '('
[ PARTITION BY expression [, expression ]* ]
[ ORDER BY orderItem [, orderItem ]* ]
[ MEASURES measureColumn [, measureColumn ]* ]
[ ONE ROW PER MATCH ]
[ AFTER MATCH
( SKIP TO NEXT ROW
| SKIP PAST LAST ROW
| SKIP TO FIRST variable
| SKIP TO LAST variable
| SKIP TO variable )
]
PATTERN '(' pattern ')'
[ WITHIN intervalLiteral ]
DEFINE variable AS condition [, variable AS condition ]*
')'
measureColumn:
expression AS alias
pattern:
patternTerm [ '|' patternTerm ]*
patternTerm:
patternFactor [ patternFactor ]*
patternFactor:
variable [ patternQuantifier ]
patternQuantifier:
'*'
| '*?'
| '+'
| '+?'
| '?'
| '??'
| '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
| '{' repeat '}'
Identifiers
Similar to Java, Flink SQL uses the following syntax policies for identifiers, such as table names, column names, and function names:
Definitions of identifiers are case-sensitive regardless of whether they are enclosed in backticks (`).
Matching of identifiers is case-sensitive.
Unlike Java, Flink SQL allows non-alphanumeric characters in identifiers. Example:
SELECT a AS `my field` FROM t
String constants
String constants must be enclosed in single quotation marks (') rather than double quotation marks ("). Example:
SELECT 'Hello World'
Duplicate a single quotation mark (') in a string for escaping. Example:
Flink SQL> SELECT 'Hello World', 'It''s me';
+-------------+---------+
| EXPR$0 | EXPR$1 |
+-------------+---------+
| Hello World | It's me |
+-------------+---------+
1 row in set
Unicode values are supported in string constants. To include Unicode values in a string constant, use one of the following methods:
Use the default escape character by adding a backslash (\) before the Unicode value.
SELECT U&'\263A'
Use a custom escape character. Example:
SELECT U&'#263A' UESCAPE '#' -- A number sign (#) is used as the escape character.
The following table describes the queries supported by Apache Flink 1.15 and provides corresponding references.
If you want to view references for another version of Apache Flink, you can switch to the version on the Apache Flink website.
Query | References |
Hints | |
WITH clause | |
SELECT and WHERE clauses | |
SELECT DISTINCT | |
Window functions | |
Window aggregation | |
Group aggregation | |
Over aggregation | |
Join | |
Window join | |
Set operations | |
ORDER BY clause | |
LIMIT clause | |
Top-N | |
Window Top-N | |
Deduplication | |
Window deduplication | |
Pattern recognition |
Query execution
In streaming mode, the input streams can be categorized into update streams and non-update streams. Non-update streams contain only INSERT-type events, whereas update streams contain other types of events. For example, a change data capture (CDC) source produces an update stream from an external system. Specific operations within Flink, such as group aggregation and Top-N computing, also generate update events. In most cases, operations that generate update events are performed by stateful operators. Stateful operators use state to manage the updates. However, not all stateful operators can consume update streams. For example, the operators for over aggregation and interval join do not support update streams as inputs.
The following table describes the runtime information of each supported query, including the name of the operator, whether the operator is stateful, whether the operator can consume update streams, and whether update events are generated. The information applies to Ververica Runtime (VVR) 6.0.X and later.
Query | Runtime operator | Use state data | Consume update streams | Generate update events | Notes |
SELECT and WHERE | Calc | No | Yes | No | N/A |
Lookup Join | LookupJoin | No* | Yes | No | For VVR 8.0.1 or later, if you set the table.optimizer.non-deterministic-update.strategy parameter to TRY_RESOLVE and non-deterministic problems are detected in the query, state data is automatically used to resolve the problems. If you want to disable the use of state data, set the table.optimizer.non-deterministic-update.strategy parameter to IGNORE. Note that after you change the value of this parameter, incompatibility may occur. In this case, you must re-execute the query. |
Table Function | Correlate | No | Yes | No | N/A |
SELECT DISTINCT | GroupAggregate | Yes | Yes | Yes | N/A |
Group aggregation | GroupAggregate LocalGroupAggregate GlobalGroupAggregate IncrementalGroupAggregate | Yes* | Yes | Yes | The pre-aggregation operator LocalGroupAggregate does not use state data. |
Over aggregation | OverAggregate | Yes | No | No | N/A |
Window aggregation | GroupWindowAggregate WindowAggregate LocalWindowAggregate GlobalWindowAggregate | Yes* | Yes* | No* |
|
Join on two streams (Regular join) | Join | Yes | Yes | Yes* | If you use an outer join, such as LEFT JOIN, RIGHT JOIN, or FULL OUTER JOIN, update events are generated. |
Interval Join | IntervalJoin | Yes | No | No | N/A |
Temporal Join | TemporalJoin | Yes | Yes | No | N/A |
Window Join | WindowJoin | Yes | No | No | N/A |
Top-N | Rank | Yes | Yes | Yes | Top-N queries do not support ranking based on the processing time. You can use other built-in functions for ranking, such as CURRENT_TIMESTAMP. Warning If you specify a processing time field in the ORDER BY clause of a Top-N query, data errors may occur. This issue is not reported during syntax check in VVR 8.0.7 or earlier. We recommend that you use other built-in functions, such as CURRENT_TIMESTAMP. |
Window Top-N | WindowRank | Yes | No | No | N/A |
Deduplication | Deduplicate | Yes | No | Yes* | If you use the Deduplicate Keep FirstRow policy to deduplicate data based on the processing time (Proctime), no update events are generated. |
Window deduplication | WindowDeduplicate | Yes | No | No | N/A |
Stateless operators only pass through event types, which ensures that output events are of the same type as input events. Stateless operators do not generate update events regardless of whether the input is an update stream.