本文为您介绍实时计算Flink全托管的复杂事件处理(CEP)语句的详情。
背景信息
相较于Apache Flink的CEP SQL,实时计算Flink版在其基础上进行了增强。例如,支持输出超时匹配、支持松散连接(followedBy)、支持指定事件之间的连续性等。关于Apache Flink CEP SQL的基本能力,详情请参见Pattern Recognition。
使用限制
仅实时计算引擎vvr-6.0.2-flink-1.15及以上版本支持CEP SQL扩展语法。
仅实时计算引擎vvr-6.0.5-flink-1.15及以上版本支持组合模式和NO SKIP语法。
输出超时匹配
假如有以下输入事件序列:
+----+------+------------------+
| id | type | rowtime |
+----+------+------------------+
| 1 | A | 2022-09-19 12:00 |
| 2 | B | 2022-09-19 12:01 |
| 3 | A | 2022-09-19 12:02 |
| 4 | B | 2022-09-19 12:05 |
+----+------+------------------+
对于模式A B
,如果我们需要限制整个模式的匹配时间跨度在2分钟之内,则可以在PATTERN语句之后声明WITHIN INTERVAL '2' MINUTES
,代码示例如下。
SELECT *
FROM MyTable MATCH_RECOGNIZE (
ORDER BY rowtime
MEASURES
A.id AS aid,
B.id AS bid,
A.rowtime AS atime,
B.rowtime AS btime
PATTERN (A B) WITHIN INTERVAL '2' MINUTES
DEFINE
A AS type = 'A',
B AS type = 'B'
) AS T
如果不考虑WITHIN限制,我们可以得到两个匹配id=1, id=2
、id=3, id=4
。加入WITHIN描述后,第二条匹配结果中A和B事件之间的时间间隔为3分钟,超过了WITHIN限制中定义的2分钟,该SQL最终得到的结果中只包含了前一个匹配结果,如下所示。
+-----+-----+------------------+------------------+
| aid | bid | atime | btime |
+-----+-----+------------------+------------------+
| 1 | 2 | 2022-09-19 12:00 | 2022-09-19 12:01 |
+-----+-----+------------------+------------------+
当定义WITHIN语句后,超过时间限制的部分匹配序列会被作为匹配失败的事件序列丢弃。如果需要输出超时的事件匹配序列(在时间限制内未完全匹配的事件序列),则可以使用ONE ROW PER MATCH SHOW TIMEOUT MATCHES
语句,例如可以使用如下SQL语句来输出超时序列。
SELECT *
FROM MyTable MATCH_RECOGNIZE (
ORDER BY rowtime
MEASURES
A.id AS aid,
B.id AS bid,
A.rowtime AS atime,
B.rowtime AS btime
ONE ROW PER MATCH SHOW TIMEOUT MATCHES
PATTERN (A B) WITHIN INTERVAL '2' MINUTES
DEFINE
A AS type = 'A',
B AS type = 'B'
) AS T
该语句将输出未匹配的序列,输出结果如下。
+-----+--------+------------------+------------------+
| aid | bid | atime | btime |
+-----+--------+------------------+------------------+
| 1 | 2 | 2022-09-19 12:00 | 2022-09-19 12:01 |
| 3 | <NULL> | 2022-09-19 12:00 | <NULL> |
+-----+--------+------------------+------------------+
由于id=4的B事件已经超过了WITHIN时间限制,并不会包括在匹配序列中,因此得到的bid和btime均为空值。
事件之间的连续性
开源Flink CEP Java API中支持通过严格连续(next()
)、松散连续(followedBy()
)、非确定性松散连续(followedByAny()
)、严格非连续(notNext()
)、松散非连续(notFollowedBy()
)等方式定义事件之间的连续性策略。
在Flink CEP SQL中默认使用的严格连续策略,即满足模式匹配的相邻事件之间需要严格连续,连续的输入事件之间不能存在非匹配的事件。例如上述例子中的模式 (A B) 指定A和B事件之间需要紧密相连。阿里云实时计算Flink版对此进行了扩展,支持与Java API完全对等的表达能力。
例如对于a1, b1, a2, a3, b2, b3
输入事件序列,不同模式对应的匹配序列如下表所示。
匹配过程中,After Match子句使用的是SKIP TO NEXT ROW策略,策略详情请参见匹配后的策略。
Java API | SQL | 策略 | 匹配序列 |
|
| 严格连续:期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件。 |
|
|
其中C为DEFINE中未定义的符号,表示任意匹配。 | 松散连续:忽略匹配事件之间的不匹配事件。 |
|
|
其中C为DEFINE中未定义的符号,表示任意匹配。 | 非确定性松散连续:更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。 |
说明 本示例的结果基于CEP SQL默认的SKIP TO NEXT ROW策略,而Flink CEP Java API的默认策略为NO SKIP。如果您要使用NO SKIP,详情请参见AFTER MATCH NO SKIP策略。 |
|
| 严格非连续:期望事件之后不紧接出现另一事件。 |
|
|
其中C为DEFINE中未定义的符号,表示任意匹配。 说明 在模式末尾使用notFollowedBy语法需要指定WITHIN条件。 | 松散非连续:期望一个事件不出现在两个事件之间的任何地方,在模式末尾配合WITHIN条件使用表示一段时间之内没有任何某类事件到来。 | 无匹配 |
循环模式中的连续性以及贪婪匹配
目前CEP SQL暂不支持循环模式中使用非确定性松散连续。
开源Flink CEP Java API支持定义循环模式匹配的连续性和贪婪策略,Apache Flink CEP SQL中默认使用严格连续且贪婪策略。例如A+匹配的多个A事件之间不允许有其他事件存在,并且匹配尽可能多的A事件。通过在循环量词(例如*、+、{3, })后增加?符号来指定连续性和贪婪策略。
例如有事件序列a1, b1, a2, a3, c1,其中条件为A AS type = 'a', C AS type = 'a' or type = 'c'。针对不同的模式,对应的匹配序列如下表所示。
匹配过程中,After Match子句使用的是SKIP TO NEXT ROW策略,策略详情请参见匹配后的策略。
标识符 | 连续性 | 贪婪性 | 示例模式 | 等效语义 | 匹配序列 |
无 | 严格连续 | 贪婪 |
|
|
|
? | 严格连续 | 非贪婪 |
|
|
|
?? | 松散连续 | 贪婪 |
|
|
|
??? | 松散连续 | 非贪婪 |
|
|
|
循环模式指定停止条件(Until)
开源Flink CEP Java API支持使用函数until(condition)
来为循环模式指定其停止条件,其效果是在匹配循环模式的过程中,若当前事件符合until条件,则立刻终止当前循环模式的匹配,并从当前事件开始匹配后续模式。在阿里云实时计算Flink版的SQL作业中可以在循环量词如+
、*
、{3, }
后使用{ CONDITION }
语法来表达until语义。
例如针对事件序列a1, d1, a2, b1, a3, c1
,有匹配条件DEFINE A AS A.type = 'a' OR A.type = 'b', B AS B.type = 'b', C AS C.type = 'c'
,不同的模式对应的匹配序列如下表所示:
匹配过程中,使用AFTER MATCH SKIP TO NEXT ROW
策略,策略详情请参见匹配后的策略。
模式 | 等效语义 | 匹配序列 | 说明 |
|
|
| 以a或b开头的事件都能匹配A模式,A模式内部和AC之间为严格连续。由于a1、a2之间存在d1,无法从a1开始匹配 |
|
|
| A循环模式增加了until B条件,AC之间仍为严格连续。由于a2开始的循环模式需要在b1处结束,无法满足与c1之间的严格连续要求。 |
|
|
| AC之间为松散连续,以a2开始的循环模式在b1处结束,并跳过b1、a3匹配c1。 |
|
|
| 循环模式A内部为松散连续,可跳过d1并结束于b1,匹配a1、a2。 |
组合模式(Group Pattern)
开源 Flink CEP Java API支持组合模式(group pattern),将多个模式组合为一个整体用在next()
、followedBy()
和followedByAny()
函数中,并支持整体的循环。在阿里云实时计算Flink版的SQL作业中使用SQL标准中的语法(...)
来定义组合模式,支持使用循环量词如+
、*
、{3, }
等。
例如对于模式PATTERN (A (B C*)+? D)
,其中(B C*)
为一个组合模式,并指定该组合循环出现一次以上,?
表明为非贪婪匹配,其对应Java代码如下:
Pattern.<String>begin("A").where(...)
.next(
Pattern.<String>begin("B").where(...)
.next("C").where(...).oneOrMore().optional().greedy().consecutive())
.oneOrMore().consecutive()
.next("D").where(...)
匹配成功时MEASURES
可以提取匹配结果中的某些部分信息来作为结果的一部分。例如某组合模式匹配到的结果为b1
,b2 c1
,b3 c2 c3
,则MEASURES
可以只获取其中的一部分信息进行输出(下文中的SQL所示),如果只关注事件b,则通过FIRST(B.id)
可以拿到第一组匹配结果的b,FIRST(B.id,1)
可以拿到第二组结果的b,以此类推,输出的结果为b1,b2,b3
。
SELECT *
FROM MyTable MATCH_RECOGNIZE (
ORDER BY rowtime
MEASURES
FIRST(B.id) AS b1_id,
FIRST(B.id,1) AS b2_id,
FIRST(B.id,2) AS b3_id
PATTERN (A (B C*)+? D)
DEFINE
A AS type = 'A',
B AS type = 'B',
C AS type = 'C',
D AS type = 'D'
) AS T
需要注意的是,组合模式与其前一个模式之间的连续性声明作用于组合模式中的第一个模式而非整个组合模式。例如在PATTERN (A {- X*? -} (B C))
中模式A
和组合模式(B C)
之间使用了followedBy关系,则实际为声明了A和B之间的followedBy关系,其执行效果为A
和(B C)
之间可以存在若干不匹配B
的事件,而非若干不匹配(B C)
的事件。如序列a1 b1 d1 b2 c1
对于该模式无输出,因为b1
出现后匹配过程立刻进入组合模式(B C)
,而d1
无法匹配C
模式导致序列匹配失败。
循环组合模式不支持贪婪匹配,例如
PATTERN ((A B)+)
。不支持在until和notNext的语法中使用组合模式,例如
PATTERN (A+{(B C)})
、PATTERN (A [^(B C)])
。组合模式的首模式不支持声明为可选(optional),例如
PATTERN (A (B? C))
。
AFTER MATCH NO SKIP策略
Flink CEP Java API中After Match策略默认为NO_SKIP,CEP SQL中的默认策略则为SKIP_TO_NEXT_ROW。阿里云实时计算Flink版扩展了SQL标准中的AFTER MATCH语句,可通过AFTER MATCH NO SKIP
语句来声明NO_SKIP策略,NO_SKIP策略在完成一条序列的匹配时,不会终止或丢弃其他已经开始的匹配过程。
NO_SKIP策略的一种常用场景为结合followedByAny跳过一些匹配事件进行更松散的匹配。例如对于序列a1 b1 b2 b3 c1
,PATTERN (A {- X* -} B {- Y*? -} C)
(等价于Pattern.begin("A").followedByAny("B").followedBy("C")
)在使用默认的AFTER MATCH SKIP TO NEXT ROW
时得到结果a1 b1 c1
,因为当a1 b1 c1
完成匹配时所有以a1
开头的序列都会被丢弃,而使用AFTER MATCH NO SKIP
则能得到所有匹配序列a1 b1 c1
、a1 b2 c1
和a1 b3 c1
。