本文為您介紹Flink全託管支援的Queries語句詳情。
Flink全託管相容Apache Flink的Queries語句。以下BNF-grammar描述了支援的流批SQL特性的超集。
query:
values
| WITH withItem [ , withItem ]* query
| {
select
| selectWithoutFrom
| query UNION [ ALL ] query
| query EXCEPT query
| query INTERSECT query
}
[ ORDER BY orderItem [, orderItem ]* ]
[ LIMIT { count | ALL } ]
[ OFFSET start { ROW | ROWS } ]
[ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
withItem:
name
[ '(' column [, column ]* ')' ]
AS '(' query ')'
orderItem:
expression [ ASC | DESC ]
select:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
FROM tableExpression
[ WHERE booleanExpression ]
[ GROUP BY { groupItem [, groupItem ]* } ]
[ HAVING booleanExpression ]
[ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
selectWithoutFrom:
SELECT [ ALL | DISTINCT ]
{ * | projectItem [, projectItem ]* }
projectItem:
expression [ [ AS ] columnAlias ]
| tableAlias . *
tableExpression:
tableReference [, tableReference ]*
| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:
tablePrimary
[ matchRecognize ]
[ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
tablePrimary:
[ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
| LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
| [ LATERAL ] '(' query ')'
| UNNEST '(' expression ')'
tablePath:
[ [ catalogName . ] databaseName . ] tableName
systemTimePeriod:
FOR SYSTEM_TIME AS OF dateTimeExpression
dynamicTableOptions:
/*+ OPTIONS(key=val [, key=val]*) */
key:
stringLiteral
val:
stringLiteral
values:
VALUES expression [, expression ]*
groupItem:
expression
| '(' ')'
| '(' expression [, expression ]* ')'
| CUBE '(' expression [, expression ]* ')'
| ROLLUP '(' expression [, expression ]* ')'
| GROUPING SETS '(' groupItem [, groupItem ]* ')'
windowRef:
windowName
| windowSpec
windowSpec:
[ windowName ]
'('
[ ORDER BY orderItem [, orderItem ]* ]
[ PARTITION BY expression [, expression ]* ]
[
RANGE numericOrIntervalExpression {PRECEDING}
| ROWS numericExpression {PRECEDING}
]
')'
matchRecognize:
MATCH_RECOGNIZE '('
[ PARTITION BY expression [, expression ]* ]
[ ORDER BY orderItem [, orderItem ]* ]
[ MEASURES measureColumn [, measureColumn ]* ]
[ ONE ROW PER MATCH ]
[ AFTER MATCH
( SKIP TO NEXT ROW
| SKIP PAST LAST ROW
| SKIP TO FIRST variable
| SKIP TO LAST variable
| SKIP TO variable )
]
PATTERN '(' pattern ')'
[ WITHIN intervalLiteral ]
DEFINE variable AS condition [, variable AS condition ]*
')'
measureColumn:
expression AS alias
pattern:
patternTerm [ '|' patternTerm ]*
patternTerm:
patternFactor [ patternFactor ]*
patternFactor:
variable [ patternQuantifier ]
patternQuantifier:
'*'
| '*?'
| '+'
| '+?'
| '?'
| '??'
| '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
| '{' repeat '}'
標識符
對於標識符(表名,列名,函數名),Flink 採用了和Java相似的文法策略:
不管標識符是否被反引號標識,該標識符是大小寫敏感的。
標識符的匹配是大小寫敏感的。
和Java不同的是,Flink SQL支援標識符包含非英文或數位字元,例如,以下是符合標準的。
SELECT a AS `my field` FROM t
字串常量
Flink SQL使用單引號來表示字串常量,而非使用雙引號來表示,例如:
SELECT 'Hello World'
為了在字串表示單引號,您可以使用兩個單引號來轉義。例如:
Flink SQL> SELECT 'Hello World', 'It''s me';
+-------------+---------+
| EXPR$0 | EXPR$1 |
+-------------+---------+
| Hello World | It's me |
+-------------+---------+
1 row in set
Flink SQL支援在字串常量中包含unicode值,您可以通過以下方式聲明:
使用反斜線作為預設轉義符
SELECT U&'\263A'
使用自訂符號作為轉義符
SELECT U&'#263A' UESCAPE '#' -- 使用'#'作為轉義符
Apache Flink V1.15 Queries語句詳情如下表所示。
如果您需要查看其它版本Queries語句,請注意切換到對應版本。
Queries語句 | 相關文檔 |
Hints | |
WITH子句 | |
SELECT與WHERE子句 | |
SELECT DISTINCT | |
視窗函數 | |
視窗彙總 | |
分組彙總 | |
Over彙總 | |
Join | |
視窗關聯 | |
集合操作 | |
ORDER BY語句 | |
LIMIT語句 | |
Top-N | |
視窗Top-N | |
去重 | |
視窗去重 | |
模式檢測 |
Query操作運行時資訊說明
在流模式下,我們根據是否包含更新訊息將處理的流資料分為更新流(包含更新訊息)和非更新流(只包含INSERT類型訊息的稱為非更新流),例如CDC源就是Flink整合自外部的更新流,另外Query內部的一些操作也可能產生更新資料,如分組彙總(Group Aggregation)、Top-N計算等。能產生更新事件的操作通常會使用狀態(State),我們一般將這類操作稱為狀態運算元。值得注意的是,並非所有的狀態運算元都支援處理更新流,例如,Over彙總(Over Aggregation)和Interval Join目前還不支援將更新流作為輸入。
以下表格資訊基於VVR-6.0.x 及以上版本整理,包括了Query操作對應的運行時運算元名稱、運算元是否使用了狀態(State)、是否支援處理更新流、是否產生更新。
Query操作 | 對應運行時運算元名稱 | 是否使用狀態(State) | 是否支援更新流 | 是否產生更新 | 說明 |
SELECT與WHERE | Calc | 否 | 是 | 否 | 無。 |
Lookup Join | LookupJoin | 否* | 是 | 否 | 在VVR-8.0.1及以上版本中設定作業參數‘table.optimizer.non-deterministic-update.strategy’為‘TRY_RESOLVE’且引擎檢測到當前作業存在非確定性更新風險時,會自動啟用狀態(State)來消除非確定性,可以通過設定該參數為'IGNORE'強制關閉使用狀態,注意修改該參數改變運算元是否使用狀態時,會導致作業狀態不相容,需要無狀態啟動作業。 |
Table Function | Correlate | 否 | 是 | 否 | 無。 |
SELECT DISTINCT | GroupAggregate | 是 | 是 | 是 | 無。 |
分組彙總(Group Aggregation) | GroupAggregate LocalGroupAggregate GlobalGroupAggregate IncrementalGroupAggregate | 是* | 是 | 是 | LocalGroupAggregate預彙總運算元不會使用狀態(State)。 |
Over彙總(Over Aggregation) | OverAggregate | 是 | 否 | 否 | 無。 |
視窗彙總(Window Aggregation) | GroupWindowAggregate WindowAggregate LocalWindowAggregate GlobalWindowAggregate | 是* | 是* | 否* |
|
雙流Join(Regular Join) | Join | 是 | 是 | 是* | 當使用外連線類型時,例如LEFT、RIGHT、FULL OUTER Join會產生更新。 |
Interval Join | IntervalJoin | 是 | 否 | 否 | 無。 |
Temporal Join | TemporalJoin | 是 | 是 | 否 | 無。 |
視窗關聯(Window Join) | WindowJoin | 是 | 否 | 否 | 無。 |
Top-N | Rank | 是 | 是 | 是 | Top-N不支援使用Processing Time欄位作為排序鍵之一,請使用CURRENT_TIMESTAMP等其他內建函數進行排序。 警告 使用Processing Time欄位作為Top-N的排序鍵之一會有資料錯誤問題。Realtime Compute引擎VVR 8.0.7及以前版本文法檢測不會報錯,請您使用CURRENT_TIMESTAMP等其他內建函數替代。 |
視窗Top-N | WindowRank | 是 | 否 | 否 | 無。 |
去重(Deduplication) | Deduplicate | 是 | 否 | 是* | 基於處理時間(Proctime)使用first row去重時不會產生更新。 |
視窗去重(Window Deduplication) | WindowDeduplicate | 是 | 否 | 否 | 無。 |
非狀態運算元僅會透傳訊息類型,並不會主動產生更新訊息,即輸出的訊息類型和輸入的訊息類型保持一致;產生更新是指當輸入為非更新流時也可能產生更新訊息。