全部產品
Search
文件中心

Realtime Compute for Apache Flink:複雜事件處理(CEP)語句

更新時間:Aug 08, 2024

本文為您介紹Realtime ComputeFlink全託管的複雜事件處理(CEP)語句的詳情。

背景資訊

相較於Apache Flink的CEP SQL,Realtime ComputeFlink版在其基礎上進行了增強。例如,支援輸出逾時匹配、支援鬆散串連(followedBy)、支援指定事件之間的連續性等。關於Apache Flink CEP SQL的基本能力,詳情請參見Pattern Recognition

使用限制

  • 僅Realtime Compute引擎vvr-6.0.2-flink-1.15及以上版本支援CEP SQL擴充文法。

  • 僅Realtime Compute引擎vvr-6.0.5-flink-1.15及以上版本支援組合模式和NO SKIP文法。

輸出逾時匹配

假如有以下輸入事件序列:

+----+------+------------------+
| id | type |          rowtime |
+----+------+------------------+
|  1 |    A | 2022-09-19 12:00 |
|  2 |    B | 2022-09-19 12:01 |
|  3 |    A | 2022-09-19 12:02 |
|  4 |    B | 2022-09-19 12:05 |
+----+------+------------------+

對於模式A B,如果我們需要限制整個模式的匹配時間跨度在2分鐘之內,則可以在PATTERN語句之後聲明WITHIN INTERVAL '2' MINUTES,程式碼範例如下。

SELECT *
FROM MyTable MATCH_RECOGNIZE (
  ORDER BY rowtime
  MEASURES
    A.id AS aid,
    B.id AS bid,
    A.rowtime AS atime,
    B.rowtime AS btime
  PATTERN (A B) WITHIN INTERVAL '2' MINUTES
  DEFINE
    A AS type = 'A',
    B AS type = 'B'
) AS T

如果不考慮WITHIN限制,我們可以得到兩個匹配id=1, id=2id=3, id=4。加入WITHIN描述後,第二條匹配結果中A和B事件之間的時間間隔為3分鐘,超過了WITHIN限制中定義的2分鐘,該SQL最終得到的結果中只包含了前一個匹配結果,如下所示。

+-----+-----+------------------+------------------+
| aid | bid |            atime |            btime |
+-----+-----+------------------+------------------+
|   1 |   2 | 2022-09-19 12:00 | 2022-09-19 12:01 |
+-----+-----+------------------+------------------+

當定義WITHIN語句後,超過時間限制的部分匹配序列會被作為匹配失敗的事件序列丟棄。如果需要輸出逾時的事件匹配序列(在時間限制內未完全符合的事件序列),則可以使用ONE ROW PER MATCH SHOW TIMEOUT MATCHES語句,例如可以使用如下SQL語句來輸出逾時序列。

SELECT *
FROM MyTable MATCH_RECOGNIZE (
  ORDER BY rowtime
  MEASURES
    A.id AS aid,
    B.id AS bid,
    A.rowtime AS atime,
    B.rowtime AS btime
  ONE ROW PER MATCH SHOW TIMEOUT MATCHES
  PATTERN (A B) WITHIN INTERVAL '2' MINUTES
  DEFINE
    A AS type = 'A',
    B AS type = 'B'
) AS T

該語句將輸出未匹配的序列,輸出結果如下。

+-----+--------+------------------+------------------+
| aid |    bid |            atime |            btime |
+-----+--------+------------------+------------------+
|   1 |      2 | 2022-09-19 12:00 | 2022-09-19 12:01 |
|   3 | <NULL> | 2022-09-19 12:00 |           <NULL> |
+-----+--------+------------------+------------------+
說明

由於id=4的B事件已經超過了WITHIN時間限制,並不會包括在匹配序列中,因此得到的bid和btime均為空白值。

事件之間的連續性

開源Flink CEP Java API中支援通過嚴格連續(next())、鬆散連續(followedBy())、非確定性鬆散連續(followedByAny())、嚴格非連續(notNext())、鬆散非連續(notFollowedBy())等方式定義事件之間的連續性策略。

在Flink CEP SQL中預設使用的嚴格連續策略,即滿足模式比對的相鄰事件之間需要嚴格連續,連續的輸入事件之間不能存在非匹配的事件。例如上述例子中的模式 (A B) 指定A和B事件之間需要緊密相連。阿里雲Realtime ComputeFlink版對此進行了擴充,支援與Java API完全對等的表達能力。

例如對於a1, b1, a2, a3, b2, b3輸入事件序列,不同模式對應的匹配序列如下表所示。

說明

匹配過程中,After Match子句使用的是SKIP TO NEXT ROW策略,策略詳情請參見匹配後的策略

Java API

SQL

策略

匹配序列

A.next(B)

(A B)

嚴格連續:期望所有匹配的事件嚴格的一個接一個出現,中間沒有任何不匹配的事件。

{a1 b1}
{a3 b2}

A.followedBy(B)

(A {- C*? -} B)

其中C為DEFINE中未定義的符號,表示任意匹配。

鬆散連續:忽略匹配事件之間的不匹配事件。

{a1 b1}
{a2 b2}
{a3 b2}

A.followedByAny(B)

(A {- C* -} B)

其中C為DEFINE中未定義的符號,表示任意匹配。

非確定性鬆散連續:更進一步的鬆散連續,允許忽略掉一些匹配事件的附加匹配。

{a1 b1}
{a2 b2}
{a3 b2}
說明

本樣本的結果基於CEP SQL預設的SKIP TO NEXT ROW策略,而Flink CEP Java API的預設策略為NO SKIP。如果您要使用NO SKIP,詳情請參見AFTER MATCH NO SKIP策略

A.notNext(B)

(A [^B])

嚴格非連續:期望事件之後不緊接出現另一事件。

{a2}

A.notFollowedBy(B)

(A {- C*? -} [^B]

其中C為DEFINE中未定義的符號,表示任意匹配。

說明

在模式末尾使用notFollowedBy文法需要指定WITHIN條件。

鬆散非連續:期望一個事件不出現在兩個事件之間的任何地方,在模式末尾配合WITHIN條件使用表示一段時間之內沒有任何某類事件到來。

無匹配

迴圈模式中的連續性以及貪婪匹配

重要

目前CEP SQL暫不支援迴圈模式中使用非確定性鬆散連續。

開源Flink CEP Java API支援定義迴圈模式比對的連續性和貪婪策略,Apache Flink CEP SQL中預設使用嚴格連續且貪婪策略。例如A+匹配的多個A事件之間不允許有其他事件存在,並且匹配儘可能多的A事件。通過在迴圈量詞(例如*、+、{3, })後增加?符號來指定連續性和貪婪策略。

例如有事件序列a1, b1, a2, a3, c1,其中條件為A AS type = 'a', C AS type = 'a' or type = 'c'。針對不同的模式,對應的匹配序列如下表所示。

說明

匹配過程中,After Match子句使用的是SKIP TO NEXT ROW策略,策略詳情請參見匹配後的策略

標識符

連續性

貪婪性

樣本模式

等效語義

匹配序列

嚴格連續

貪婪

A+ C

A.oneOrMore().greedy().consecutive().next(C)

{a2 a3 c1}
{a3 c1}

?

嚴格連續

非貪婪

A+? C

A.oneOrMore().consecutive().next(C)

{a2 a3}
{a3 c1}

??

鬆散連續

貪婪

A+?? C

A.oneOrMore().greedy().next(C)

{a1 a2 a3 c1}
{a2 a3 c1}
{a3 c1}

???

鬆散連續

非貪婪

A+??? C

A.oneOrMore().next(C)

{a1 a2 a3}
{a2 a3}
{a3 c1}

迴圈模式指定停止條件(Until)

開源Flink CEP Java API支援使用函數until(condition)來為迴圈模式指定其停止條件,其效果是在匹配迴圈模式的過程中,若當前事件符合until條件,則立刻終止當前迴圈模式的匹配,並從當前事件開始匹配後續模式。在阿里雲Realtime ComputeFlink版的SQL作業中可以在迴圈量詞如+*{3, }後使用{ CONDITION }文法來表達until語義。

例如針對事件序列a1, d1, a2, b1, a3, c1,有匹配條件DEFINE A AS A.type = 'a' OR A.type = 'b', B AS B.type = 'b', C AS C.type = 'c',不同的模式對應的匹配序列如下表所示:

說明

匹配過程中,使用AFTER MATCH SKIP TO NEXT ROW策略,策略詳情請參見匹配後的策略

模式

等效語義

匹配序列

說明

A+ C

A.oneOrMore().consecutive().greedy().next(C)

a2 b1 a3 c1
b1 a3 c1
a3 c1

以a或b開頭的事件都能匹配A模式,A模式內部和AC之間為嚴格連續。由於a1、a2之間存在d1,無法從a1開始匹配

A+{B} C

A.oneOrMore().consecutive().greedy().until(B).next(C)

a3 c1

A迴圈模式增加了until B條件,AC之間仍為嚴格連續。由於a2開始的迴圈模式需要在b1處結束,無法滿足與c1之間的嚴格連續要求。

A+{B} {- X*? -} C

A.oneOrMore().consecutive().greedy().until(B).followedBy(C)

a2 c1
a3 c1

AC之間為鬆散連續,以a2開始的迴圈模式在b1處結束,並跳過b1、a3匹配c1。

A+??{B} {- X*? -} C

A.oneOrMore().greedy().until(B).followedBy(C)

a1 a2 c1
a2 c1
a3 c1

迴圈模式A內部為鬆散連續,可跳過d1並結束於b1,匹配a1、a2。

組合模式(Group Pattern)

開源 Flink CEP Java API支援組合模式(group pattern),將多個模式組合為一個整體用在next()followedBy()followedByAny()函數中,並支援整體的迴圈。在阿里雲Realtime ComputeFlink版的SQL作業中使用SQL標準中的文法(...)來定義組合模式,支援使用迴圈量詞如+*{3, }等。

例如對於模式PATTERN (A (B C*)+? D),其中(B C*)為一個組合模式,並指定該組合迴圈出現一次以上,?表明為非貪婪匹配,其對應Java代碼如下:

Pattern.<String>begin("A").where(...)
  .next(
  	Pattern.<String>begin("B").where(...)
  		.next("C").where(...).oneOrMore().optional().greedy().consecutive())
  .oneOrMore().consecutive()
  .next("D").where(...)

匹配成功時MEASURES可以提取匹配結果中的某些部分資訊來作為結果的一部分。例如某組合模式比對到的結果為b1b2 c1b3 c2 c3,則MEASURES可以只擷取其中的一部分資訊進行輸出(下文中的SQL所示),如果只關注事件b,則通過FIRST(B.id)可以拿到第一組匹配結果的b,FIRST(B.id,1)可以拿到第二組結果的b,以此類推,輸出的結果為b1,b2,b3

SELECT *
FROM MyTable MATCH_RECOGNIZE (
  ORDER BY rowtime
  MEASURES
    FIRST(B.id) AS b1_id,
    FIRST(B.id,1) AS b2_id,
    FIRST(B.id,2) AS b3_id
  PATTERN (A (B C*)+? D)
  DEFINE
    A AS type = 'A',
    B AS type = 'B',
    C AS type = 'C',
    D AS type = 'D'
) AS T

需要注意的是,組合模式與其前一個模式之間的連續性聲明作用於組合模式中的第一個模式而非整個組合模式。例如在PATTERN (A {- X*? -} (B C))中模式A和組合模式(B C)之間使用了followedBy關係,則實際為聲明了A和B之間的followedBy關係,其執行效果為A(B C)之間可以存在若干不匹配B的事件,而非若干不匹配(B C)的事件。如序列a1 b1 d1 b2 c1對於該模式無輸出,因為b1出現後匹配過程立刻進入組合模式(B C),而d1無法匹配C模式導致序列匹配失敗。

重要
  • 迴圈組合模式不支援貪婪匹配,例如PATTERN ((A B)+)

  • 不支援在until和notNext的文法中使用組合模式,例如PATTERN (A+{(B C)})PATTERN (A [^(B C)])

  • 組合模式的首模式不支援聲明為可選(optional),例如PATTERN (A (B? C))

AFTER MATCH NO SKIP策略

Flink CEP Java API中After Match策略預設為NO_SKIP,CEP SQL中的預設策略則為SKIP_TO_NEXT_ROW。阿里雲Realtime ComputeFlink版擴充了SQL標準中的AFTER MATCH語句,可通過AFTER MATCH NO SKIP語句來聲明NO_SKIP策略,NO_SKIP策略在完成一條序列的匹配時,不會終止或丟棄其他已經開始的匹配過程。

NO_SKIP策略的一種常用情境為結合followedByAny跳過一些匹配事件進行更鬆散的匹配。例如對於序列a1 b1 b2 b3 c1PATTERN (A {- X* -} B {- Y*? -} C)(等價於Pattern.begin("A").followedByAny("B").followedBy("C"))在使用預設的AFTER MATCH SKIP TO NEXT ROW時得到結果a1 b1 c1,因為當a1 b1 c1完成匹配時所有以a1開頭的序列都會被丟棄,而使用AFTER MATCH NO SKIP則能得到所有匹配序列a1 b1 c1a1 b2 c1a1 b3 c1