本文为您介绍CEP中规则的JSON格式相关信息。
目标人群
- 客户风控平台开发人员:对Flink CEP较熟悉的平台研发人员应能快速学习本格式,并根据自身平台需求判断是否需要进一步封装。
- 客户风控策略人员:只熟悉具体策略但缺乏Java经验的同学,在熟悉CEP概念的基础上,也可快速上手本格式的使用来编写新规则,使其在上线的风控作业中应用。
JSON格式定义
- 节点(Node)定义一个节点(Node)即一个完整的模式(Pattern),它包含如下属性。
字段名 描述 类型 是否必填 备注 name Pattern名称。 string 是 一个唯一的字符串。 说明 不同节点的名称不能重复。type 该Node类型。 enum(string) 是 - 对于包含子Pattern的节点,该字段值为COMPOSITE。
- 对于无子Pattern的节点,该字段值为ATOMIC。
quantifier 量词,用于描述如何匹配该Pattern,例如只匹配一次。 dict 是 请参见本文量词(Quantifier)定义。 condition 条件。 dict 否 请参见本文条件(Condition)定义。 - 量词(Quantifier)定义量词的作用是描述对于满足该Pattern的事件要如何匹配。例如模式
"A*"
对应的量词properties为LOOPING,该Pattern内部的事件选择策略为SKIP_TILL_ANY。字段名 描述 类型 是否必填 备注 consumingStrategy 事件选择策略。 enum(string) 是 仅支持以下取值: - STRICT
- SKIP_TILL_NEXT
- SKIP_TILL_ANY
取值及含义请参见本文连续性定义。
times 用于描述该Pattern需要匹配多少次。 dict 否 取值示例如下。"times": { "from": 3, "to": 3, "windowTime": { "unit": "MINUTES", "size": 12 } },
其中from和to的数据类型均为integer,windowTime的单位可以为DAYS、HOURS、MINUTES、SECONDS和MILLISECONDS。说明 windowTime可以设为null,即"windowTime": null
。properties 描述该量词所具有的属性。 array of enumString 是 取值及含义请参见本文量词属性含义。 untilCondition 停止条件。 说明 仅可在LOOPING量词修饰的Pattern后使用。dict 否 取值及含义请参见本文条件(Condition)定义。 - 条件(Condition)定义条件用于筛选符合某些要求的事件。例如要筛选浏览时长超过5分钟的客户,浏览时长超过5分钟即为一个条件。
字段名 描述 类型 是否必填 备注 type 条件类型。 enum(string) 是 条件类型取值如下: - CLASS:对应用户自定义的条件。
- AVIATOR:对应基于AVIATOR表达式的条件。
- GROOVY:对应基于GROOVY表达式的条件。
... 其他可序列化的自定义字段。 否 ... 目前我们支持以下几种Condition:- Class类型Condition
字段名 描述 类型 是否必填 备注 type 条件类型。 enum(string) 是 固定值为Class。 className 类名。 string 是 该class完整类名,例如 com.alibaba.ververica.cep.demo.StartCondition
。 - 包含自定义参数的Condition
用户在使用普通的Class类型Condition时,只能传入类名(className),而无法动态地传入参数。在动态CEP支持中,为了提供更丰富的Condition表达能力,我们设计并实现了包含自定义参数的Condition(即CustomArgsCondition),从而允许用户在JSON中通过字符串数组来设置CustomArgsCondition所需参数, 进而动态构造CustomArgsCondition实例。这一特性允许用户动态更新Condition的参数,而无需修改Java代码。
字段名 描述 类型 是否必填 备注 type 条件类型。 enum(string) 是 固定值为Class。 className 类名。 string 是 该class完整类名,例如 com.alibaba.ververica.cep.demo.CustomMiddleCondition
。args 自定义参数。 array of string 是 一个字符串数组。 - 基于Aviator表达式的ConditionAviator是一个表达式求值引擎,可以动态地将表达式编译成字节码(详情请参见aviatorscript)。因此我们可以在作业中使用基于Aviator表达式的Condition,使得条件的阈值也可以动态修改,而无需修改Java代码重新编译运行。
字段名 描述 类型 是否必填 备注 type 类名。 string 是 固定值为AVIATOR。 expression 表达式字符串。 string 是 形如price > 10这样的表达式字符串(price变量名来自于Java代码中定义的字段)。 您可以将该字符串在数据库中的值进行修改。例如修改为price > 20,Flink CEP作业会动态加载price > 20构造新的AviatorCondition来处理之后的事件。
- 基于Groovy表达式的ConditionGroovy是一个基于JVM平台的动态语言(Groovy语法可以参见syntax)。动态CEP支持使用Groovy表达式来定义条件(Condition),从而允许动态修改条件的阈值。
字段名 描述 类型 是否必填 备注 type 类名。 string 是 固定值为GROOVY。 expression 表达式字符串。 string 是 形如price > 5.0 && name.contains("mid")这样的表达式字符串(price、name等变量名来自于Java代码中定义的字段)。您可以将该字符串在数据库中的值进行修改。例如修改为price > 20 && name.contains("end"),Flink CEP作业会动态加载新的Groovy字符串并构造新的GroovyCondition来处理之后的事件。
- 边(Edge)定义
字段名 描述 类型 是否必填 备注 source 源模式名称。 string 是 无。 target 目标模式名称。 string 是 无。 type 事件选择策略。 dict 是 支持以下取值: - STRICT
- SKIP_TILL_NEXT
- SKIP_TILL_ANY
- NOT_FOLLOW
- NOT_NEXT
取值及含义请参见本文连续性定义。
- 图(GraphNode extends Node)定义
一个图(GraphNode)代表一个完整的Pattern序列,它的节点(nodes)是各个独立的Pattern,边(edges)代表如何从一类Pattern的匹配转移到另一类Pattern的匹配。
为了支持Pattern的嵌套(即GroupPattern),我们将一个GraphNode看作是Node的子类,即一个GraphNode可以作为一个更大的GraphNode中的Node。GraphNode相比于基础Node,额外多了以下2类字段:- 描述图的结构的nodes字段与edges字段。
- 描述图内时间窗口策略的window字段与事件匹配后的跳出策略afterMatchSkipStrategy字段。
GraphNode的字段详情请参见下表。字段名 描述 类型 是否必填 备注 name 该复合Pattern名称。 String 是 一个唯一的字符串。 说明 不同Graph名称不能重复。type 该Node类型。 enum(string) 是 固定值为COMPOSITE。 version 该Graph使用的JSON格式的版本号。 Int 是 默认值为1。 nodes 该Pattern内嵌套的子Pattern。 array of Node 是 不可以为空的array。 edges 嵌套的子Pattern的连接关系。 array of Edge 是 可以为空的array。 window - 当类型为FIRST_AND_LAST:代表该复合Pattern一次完整匹配之间的最大时间间隔。
- 当类型为PREVIOUS_AND_CURRENT:代表该相邻2个子Pattern匹配之间的最大时间间隔。
dict 否 取值示例如下。 "window": { "type": "FIRST_AND_LAST", "time": { "unit": "DAYS", "size": 1 } }
单位可以为DAYS、HOURS、MINUTES、SECONDS和MILLISECONDS。数据类型为Long或Integer。
afterMatchSkipStrategy 该图内所有事件匹配后的跳过策略。 dict 是 请参见本文事件匹配后的跳过策略(AfterMatchSkipStrategy)定义。 quantifier 量词,用于描述如何匹配该Pattern,例如只匹配一次。 dict 是 请参见本文量词(Quantifier)定义。 - 事件匹配后的跳过策略(AfterMatchSkipStrategy)定义
字段名 描述 类型 是否必填 备注 type 策略类型。 enum(string) 是 参数取值如下:- NO_SKIP(默认值):每个成功的匹配都会被输出。
- SKIP_TO_NEXT:丢弃以相同事件开始的所有部分匹配。
- SKIP_PAST_LAST_EVENT:丢弃起始在这个匹配的开始和结束之间的所有部分匹配。
- SKIP_TO_FIRST:丢弃起始在这个匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配。
- SKIP_TO_LAST:丢弃起始在这个匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。
详情请参见匹配后跳过策略
patternName 策略针对的模式的名称。 string 否 一个唯一的字符串。 - 连续性定义
物理值 含义 STRICT 严格连续。 所有匹配的事件中间没有任何不匹配的事件。 SKIP_TILL_NEXT 松散连续。允许匹配的事件之间出现不匹配的事件,不匹配的事件会被忽略。 SKIP_TILL_ANY 不确定松散连续。更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。 NOT_NEXT 紧接着的后续事件不能是某指定事件。 NOT_FOLLOW 某指定事件后续不出现。 相关示例请参见事件处理(CEP)文档。
- 量词属性含义
取值 含义 SINGLE 代表该模式只出现一次。 LOOPING 代表该模式为循环模式,可能出现多次,类比正则表达式中的*与+。 TIMES 代表该模式会出现指定次数。 GREEDY 代表在匹配该模式时,会采用贪婪匹配策略,尽可能多地匹配。 OPTIONAL 代表该模式为可选模式。
示例一:普通Pattern示例
- 领取了某会场的优惠券。
- 在购物车中添加了超过3次的商品。
- 但最后没有结账付款。
Pattern<Event, Event> pattern =
Pattern.<Event>begin("start")
.where(new StartCondition())
.optional()
.followedBy("middle")
.where(new MiddleCondition())
.timesOrMore(3)
.notFollowedBy("end")
.where(new EndCondition())
.within(Time.minutes(10));
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": null,
"nodes": [
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "middle",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"LOOPING"
],
"times": {
"from": 3,
"to": 3,
"windowTime": null
},
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.MiddleCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "start",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE",
"OPTIONAL"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.StartCondition",
"type": "CLASS"
},
"type": "ATOMIC"
}
],
"edges": [
{
"source": "middle",
"target": "end",
"type": "NOT_FOLLOW"
},
{
"source": "start",
"target": "middle",
"type": "SKIP_TILL_NEXT"
}
],
"window": {
"type": "FIRST_AND_LAST",
"time": {
"unit": "MINUTES",
"size": 10
}
},
"afterMatchStrategy": {
"type": "NO_SKIP",
"patternName": null
},
"type": "COMPOSITE",
"version": 1
}
示例二:在Pattern中使用包含自定义参数的Condition
例如在实时营销场景中,假设我们给用户打上了一个人群标签,之后会根据用户所属的标签采取不同的营销策略,例如对于A类用户我们发送营销短信,对于B类用户我们发送优惠券等,而对于其他用户,我们不采取营销措施。针对上述需求,我们可以定义一个普通的Class类型Condition来解决,但当我们想调整策略,针对C类用户也发送优惠券时,如果使用的是普通的Class类型Condition,那么我们必须改写代码,重新编译并运行作业。这种情况下,我们可以使用包含自定义参数的Condition,在代码中定义好如何根据传入的参数进行策略的调整之后,我们只需要在数据库中修改传入的参数(即包含自定义参数的Condition的args字段的值),例如由["A", "B"] 改为["A", "B", "C"],即可实现营销策略的动态更新。
"condition": {
"args": [
"A", "B"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}
"condition": {
"args": [
"A", "B", "C"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}
关于该类Condition在具体业务场景的使用示例,详情请参见Demo。