全部產品
Search
文件中心

Realtime Compute for Apache Flink:Queries語句

更新時間:Aug 03, 2024

本文為您介紹Flink全託管支援的Queries語句詳情。

Flink全託管相容Apache Flink的Queries語句。以下BNF-grammar描述了支援的流批SQL特性的超集。

query:
    values
  | WITH withItem [ , withItem ]* query
  | {
        select
      | selectWithoutFrom
      | query UNION [ ALL ] query
      | query EXCEPT query
      | query INTERSECT query
    }
    [ ORDER BY orderItem [, orderItem ]* ]
    [ LIMIT { count | ALL } ]
    [ OFFSET start { ROW | ROWS } ]
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]

withItem:
    name
    [ '(' column [, column ]* ')' ]
    AS '(' query ')'

orderItem:
    expression [ ASC | DESC ]

select:
    SELECT [ ALL | DISTINCT ]
    { * | projectItem [, projectItem ]* }
    FROM tableExpression
    [ WHERE booleanExpression ]
    [ GROUP BY { groupItem [, groupItem ]* } ]
    [ HAVING booleanExpression ]
    [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]

selectWithoutFrom:
    SELECT [ ALL | DISTINCT ]
    { * | projectItem [, projectItem ]* }

projectItem:
    expression [ [ AS ] columnAlias ]
  | tableAlias . *

tableExpression:
    tableReference [, tableReference ]*
  | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition:
    ON booleanExpression
  | USING '(' column [, column ]* ')'

tableReference:
    tablePrimary
    [ matchRecognize ]
    [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]

tablePrimary:
    [ TABLE ] tablePath [ dynamicTableOptions ] [systemTimePeriod] [[AS] correlationName]
  | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
  | [ LATERAL ] '(' query ')'
  | UNNEST '(' expression ')'

tablePath:
    [ [ catalogName . ] databaseName . ] tableName

systemTimePeriod:
    FOR SYSTEM_TIME AS OF dateTimeExpression

dynamicTableOptions:
    /*+ OPTIONS(key=val [, key=val]*) */

key:
    stringLiteral

val:
    stringLiteral

values:
    VALUES expression [, expression ]*

groupItem:
    expression
  | '(' ')'
  | '(' expression [, expression ]* ')'
  | CUBE '(' expression [, expression ]* ')'
  | ROLLUP '(' expression [, expression ]* ')'
  | GROUPING SETS '(' groupItem [, groupItem ]* ')'

windowRef:
    windowName
  | windowSpec

windowSpec:
    [ windowName ]
    '('
    [ ORDER BY orderItem [, orderItem ]* ]
    [ PARTITION BY expression [, expression ]* ]
    [
        RANGE numericOrIntervalExpression {PRECEDING}
      | ROWS numericExpression {PRECEDING}
    ]
    ')'

matchRecognize:
    MATCH_RECOGNIZE '('
    [ PARTITION BY expression [, expression ]* ]
    [ ORDER BY orderItem [, orderItem ]* ]
    [ MEASURES measureColumn [, measureColumn ]* ]
    [ ONE ROW PER MATCH ]
    [ AFTER MATCH
      ( SKIP TO NEXT ROW
      | SKIP PAST LAST ROW
      | SKIP TO FIRST variable
      | SKIP TO LAST variable
      | SKIP TO variable )
    ]
    PATTERN '(' pattern ')'
    [ WITHIN intervalLiteral ]
    DEFINE variable AS condition [, variable AS condition ]*
    ')'

measureColumn:
    expression AS alias

pattern:
    patternTerm [ '|' patternTerm ]*

patternTerm:
    patternFactor [ patternFactor ]*

patternFactor:
    variable [ patternQuantifier ]

patternQuantifier:
    '*'
  | '*?'
  | '+'
  | '+?'
  | '?'
  | '??'
  | '{' { [ minRepeat ], [ maxRepeat ] } '}' ['?']
  | '{' repeat '}'

標識符

對於標識符(表名,列名,函數名),Flink 採用了和Java相似的文法策略:

  • 不管標識符是否被反引號標識,該標識符是大小寫敏感的。

  • 標識符的匹配是大小寫敏感的。

和Java不同的是,Flink SQL支援標識符包含非英文或數位字元,例如,以下是符合標準的。

SELECT a AS `my field` FROM t

字串常量

Flink SQL使用單引號來表示字串常量,而非使用雙引號來表示,例如:

SELECT 'Hello World' 

為了在字串表示單引號,您可以使用兩個單引號來轉義。例如:

Flink SQL> SELECT 'Hello World', 'It''s me';
+-------------+---------+
|      EXPR$0 |  EXPR$1 |
+-------------+---------+
| Hello World | It's me |
+-------------+---------+
1 row in set

Flink SQL支援在字串常量中包含unicode值,您可以通過以下方式聲明:

  • 使用反斜線作為預設轉義符

    SELECT U&'\263A' 
  • 使用自訂符號作為轉義符

    SELECT U&'#263A' UESCAPE '#' -- 使用'#'作為轉義符

Apache Flink V1.15 Queries語句詳情如下表所示。

說明

如果您需要查看其它版本Queries語句,請注意切換到對應版本。

Queries語句

相關文檔

Hints

SQL Hints

WITH子句

WITH clause

SELECT與WHERE子句

SELECT & WHERE clause

SELECT DISTINCT

SELECT DISTINCT

視窗函數

Windowing table-valued functions (Windowing TVFs)

視窗彙總

Window Aggregation

分組彙總

Group Aggregation

Over彙總

Over Aggregation

Join

Joins

視窗關聯

Window Join

集合操作

Set Operations

ORDER BY語句

ORDER BY clause

LIMIT語句

LIMIT clause

Top-N

Top-N

視窗Top-N

Window Top-N

去重

Deduplication

視窗去重

Window Deduplication

模式檢測

Pattern Recognition

Query操作運行時資訊說明

在流模式下,我們根據是否包含更新訊息將處理的流資料分為更新流(包含更新訊息)和非更新流(只包含INSERT類型訊息的稱為非更新流),例如CDC源就是Flink整合自外部的更新流,另外Query內部的一些操作也可能產生更新資料,如分組彙總(Group Aggregation)、Top-N計算等。能產生更新事件的操作通常會使用狀態(State),我們一般將這類操作稱為狀態運算元。值得注意的是,並非所有的狀態運算元都支援處理更新流,例如,Over彙總(Over Aggregation)和Interval Join目前還不支援將更新流作為輸入。

以下表格資訊基於VVR-6.0.x 及以上版本整理,包括了Query操作對應的運行時運算元名稱、運算元是否使用了狀態(State)、是否支援處理更新流、是否產生更新。

Query操作

對應運行時運算元名稱

是否使用狀態(State)

是否支援更新流

是否產生更新

說明

SELECT與WHERE

Calc

無。

Lookup Join

LookupJoin

否*

在VVR-8.0.1及以上版本中設定作業參數‘table.optimizer.non-deterministic-update.strategy’為‘TRY_RESOLVE’且引擎檢測到當前作業存在非確定性更新風險時,會自動啟用狀態(State)來消除非確定性,可以通過設定該參數為'IGNORE'強制關閉使用狀態,注意修改該參數改變運算元是否使用狀態時,會導致作業狀態不相容,需要無狀態啟動作業。

Table Function

Correlate

無。

SELECT DISTINCT

GroupAggregate

無。

分組彙總(Group Aggregation)

GroupAggregate

LocalGroupAggregate

GlobalGroupAggregate

IncrementalGroupAggregate

是*

LocalGroupAggregate預彙總運算元不會使用狀態(State)。

Over彙總(Over Aggregation)

OverAggregate

無。

視窗彙總(Window Aggregation)

GroupWindowAggregate

WindowAggregate

LocalWindowAggregate

GlobalWindowAggregate

是*

是*

否*

  • LocalWindowAggregate預彙總運算元不會使用狀態(State)。

  • 在更新流的支援上和社區版本不同,詳情請參見新老文法對更新流的支援情況

  • 當開啟Early或Late Fire實驗特性時會產生更新訊息,否則不會產生更新。

雙流Join(Regular Join)

Join

是*

當使用外連線類型時,例如LEFT、RIGHT、FULL OUTER Join會產生更新。

Interval Join

IntervalJoin

無。

Temporal Join

TemporalJoin

無。

視窗關聯(Window Join)

WindowJoin

無。

Top-N

Rank

Top-N不支援使用Processing Time欄位作為排序鍵之一,請使用CURRENT_TIMESTAMP等其他內建函數進行排序。

警告

使用Processing Time欄位作為Top-N的排序鍵之一會有資料錯誤問題。Realtime Compute引擎VVR 8.0.7及以前版本文法檢測不會報錯,請您使用CURRENT_TIMESTAMP等其他內建函數替代。

視窗Top-N

WindowRank

無。

去重(Deduplication)

Deduplicate

是*

基於處理時間(Proctime)使用first row去重時不會產生更新。

視窗去重(Window Deduplication)

WindowDeduplicate

無。

說明

非狀態運算元僅會透傳訊息類型,並不會主動產生更新訊息,即輸出的訊息類型和輸入的訊息類型保持一致;產生更新是指當輸入為非更新流時也可能產生更新訊息。