本文为您介绍CEP中规则的JSON格式相关信息。

目标人群

  • 客户风控平台开发人员:对Flink CEP较熟悉的平台研发人员应能快速学习本格式,并根据自身平台需求判断是否需要进一步封装。
  • 客户风控策略人员:只熟悉具体策略但缺乏Java经验的同学,在熟悉CEP概念的基础上,也可快速上手本格式的使用来编写新规则,使其在上线的风控作业中应用。

JSON格式定义

对于一个事件序列(Event Sequence)中的模式(Pattern),我们可以将其看作一个图(Graph),图中节点(Node)为针对某些事件(Event)的模式,节点之间的边(Edge)为事件选择策略(Event Selection Strategy),即如何从一类模式的匹配转移到另一类模式的匹配。每个图也可以看作一个更大的图的子节点,从而允许模式的嵌套。基于以上考虑,阿里云实时计算Flink定义了一套基于JSON的规范来描述CEP中的规则,进而方便规则的存储与修改,该规范中各个字段的含义如下。
  • 节点(Node)定义
    一个节点(Node)即一个完整的模式(Pattern),它包含如下属性。
    字段名描述类型是否必填备注
    namePattern名称。string一个唯一的字符串。
    说明 不同节点的名称不能重复。
    type该Node类型。enum(string)
    • 对于包含子Pattern的节点,该字段值为COMPOSITE。
    • 对于无子Pattern的节点,该字段值为ATOMIC。
    quantifier量词,用于描述如何匹配该Pattern,例如只匹配一次。dict请参见本文量词(Quantifier)定义。
    condition条件。dict请参见本文条件(Condition)定义。
  • 量词(Quantifier)定义
    量词的作用是描述对于满足该Pattern的事件要如何匹配。例如模式 "A*" 对应的量词properties为LOOPING,该Pattern内部的事件选择策略为SKIP_TILL_ANY。
    字段名描述类型是否必填备注
    consumingStrategy事件选择策略。enum(string)仅支持以下取值:
    • STRICT
    • SKIP_TILL_NEXT
    • SKIP_TILL_ANY

    取值及含义请参见本文连续性定义。

    times用于描述该Pattern需要匹配多少次。dict
    取值示例如下。
    "times": {
              "from": 3,
              "to": 3,
              "windowTime": {
              "unit": "MINUTES",
              "size": 12
              }
            },
    其中from和to的数据类型均为integer,windowTime的单位可以为DAYS、HOURS、MINUTES、SECONDS和MILLISECONDS。
    说明 windowTime可以设为null,即"windowTime": null
    properties描述该量词所具有的属性。array of enumString取值及含义请参见本文量词属性含义。
    untilCondition 停止条件。
    说明 仅可在LOOPING量词修饰的Pattern后使用。
    dict 取值及含义请参见本文条件(Condition)定义。
  • 条件(Condition)定义
    条件用于筛选符合某些要求的事件。例如要筛选浏览时长超过5分钟的客户,浏览时长超过5分钟即为一个条件。
    字段名描述类型是否必填备注
    type条件类型。enum(string)条件类型取值如下:
    • CLASS:对应用户自定义的条件。
    • AVIATOR:对应基于AVIATOR表达式的条件。
    • GROOVY:对应基于GROOVY表达式的条件。
    ...其他可序列化的自定义字段。...
    目前我们支持以下几种Condition:
    • Class类型Condition
      字段名描述类型是否必填备注
      type条件类型。enum(string)固定值为Class。
      className类名。string该class完整类名,例如com.alibaba.ververica.cep.demo.StartCondition
    • 包含自定义参数的Condition

      用户在使用普通的Class类型Condition时,只能传入类名(className),而无法动态地传入参数。在动态CEP支持中,为了提供更丰富的Condition表达能力,我们设计并实现了包含自定义参数的Condition(即CustomArgsCondition),从而允许用户在JSON中通过字符串数组来设置CustomArgsCondition所需参数, 进而动态构造CustomArgsCondition实例。这一特性允许用户动态更新Condition的参数,而无需修改Java代码。

      字段名描述类型是否必填备注
      type条件类型。enum(string)固定值为Class。
      className类名。string该class完整类名,例如com.alibaba.ververica.cep.demo.CustomMiddleCondition
      args自定义参数。array of string一个字符串数组。
    • 基于Aviator表达式的Condition
      Aviator是一个表达式求值引擎,可以动态地将表达式编译成字节码(详情请参见aviatorscript)。因此我们可以在作业中使用基于Aviator表达式的Condition,使得条件的阈值也可以动态修改,而无需修改Java代码重新编译运行。
      字段名描述类型是否必填备注
      type类名。string固定值为AVIATOR。
      expression表达式字符串。string形如price > 10这样的表达式字符串(price变量名来自于Java代码中定义的字段)。

      您可以将该字符串在数据库中的值进行修改。例如修改为price > 20,Flink CEP作业会动态加载price > 20构造新的AviatorCondition来处理之后的事件。

    • 基于Groovy表达式的Condition
      Groovy是一个基于JVM平台的动态语言(Groovy语法可以参见syntax)。动态CEP支持使用Groovy表达式来定义条件(Condition),从而允许动态修改条件的阈值。
      字段名描述类型是否必填备注
      type类名。string固定值为GROOVY。
      expression表达式字符串。string形如price > 5.0 && name.contains("mid")这样的表达式字符串(pricename等变量名来自于Java代码中定义的字段)。您可以将该字符串在数据库中的值进行修改。例如修改为price > 20 && name.contains("end"),Flink CEP作业会动态加载新的Groovy字符串并构造新的GroovyCondition来处理之后的事件。
  • 边(Edge)定义
    字段名描述类型是否必填备注
    source源模式名称。string无。
    target目标模式名称。string无。
    type事件选择策略。dict支持以下取值:
    • STRICT
    • SKIP_TILL_NEXT
    • SKIP_TILL_ANY
    • NOT_FOLLOW
    • NOT_NEXT

    取值及含义请参见本文连续性定义。

  • 图(GraphNode extends Node)定义

    一个图(GraphNode)代表一个完整的Pattern序列,它的节点(nodes)是各个独立的Pattern,边(edges)代表如何从一类Pattern的匹配转移到另一类Pattern的匹配。

    为了支持Pattern的嵌套(即GroupPattern),我们将一个GraphNode看作是Node的子类,即一个GraphNode可以作为一个更大的GraphNode中的Node。GraphNode相比于基础Node,额外多了以下2类字段:
    • 描述图的结构的nodes字段与edges字段。
    • 描述图内时间窗口策略的window字段与事件匹配后的跳出策略afterMatchSkipStrategy字段。
    GraphNode的字段详情请参见下表。
    字段名描述类型是否必填备注
    name该复合Pattern名称。String一个唯一的字符串。
    说明 不同Graph名称不能重复。
    type该Node类型。enum(string)固定值为COMPOSITE。
    version该Graph使用的JSON格式的版本号。Int默认值为1。
    nodes该Pattern内嵌套的子Pattern。array of Node不可以为空的array。
    edges嵌套的子Pattern的连接关系。array of Edge可以为空的array。
    window
    • 当类型为FIRST_AND_LAST:代表该复合Pattern一次完整匹配之间的最大时间间隔。
    • 当类型为PREVIOUS_AND_CURRENT:代表该相邻2个子Pattern匹配之间的最大时间间隔。
    dict取值示例如下。
    "window": {
       "type": "FIRST_AND_LAST",
       "time": {
       "unit": "DAYS",
       "size": 1
       }
    }

    单位可以为DAYS、HOURS、MINUTES、SECONDS和MILLISECONDS。数据类型为Long或Integer。

    afterMatchSkipStrategy该图内所有事件匹配后的跳过策略。dict请参见本文事件匹配后的跳过策略(AfterMatchSkipStrategy)定义。
    quantifier量词,用于描述如何匹配该Pattern,例如只匹配一次。dict请参见本文量词(Quantifier)定义。
  • 事件匹配后的跳过策略(AfterMatchSkipStrategy)定义
    字段名描述类型是否必填备注
    type策略类型。enum(string)
    参数取值如下:
    • NO_SKIP(默认值):每个成功的匹配都会被输出。
    • SKIP_TO_NEXT:丢弃以相同事件开始的所有部分匹配。
    • SKIP_PAST_LAST_EVENT:丢弃起始在这个匹配的开始和结束之间的所有部分匹配。
    • SKIP_TO_FIRST:丢弃起始在这个匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配。
    • SKIP_TO_LAST:丢弃起始在这个匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。

    详情请参见匹配后跳过策略

    patternName策略针对的模式的名称。string一个唯一的字符串。
  • 连续性定义
    物理值含义
    STRICT严格连续。 所有匹配的事件中间没有任何不匹配的事件。
    SKIP_TILL_NEXT松散连续。允许匹配的事件之间出现不匹配的事件,不匹配的事件会被忽略。
    SKIP_TILL_ANY不确定松散连续。更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。
    NOT_NEXT紧接着的后续事件不能是某指定事件。
    NOT_FOLLOW某指定事件后续不出现。

    相关示例请参见事件处理(CEP)文档。

  • 量词属性含义
    取值含义
    SINGLE代表该模式只出现一次。
    LOOPING代表该模式为循环模式,可能出现多次,类比正则表达式中的*与+。
    TIMES代表该模式会出现指定次数。
    GREEDY代表在匹配该模式时,会采用贪婪匹配策略,尽可能多地匹配。
    OPTIONAL代表该模式为可选模式。

示例一:普通Pattern示例

例如在电商大促的实时营销场景中,要找到在大促前10分钟时间窗口内满足指定条件的客户,来使用Flink 动态CEP规则针对性地调整营销策略。这些客户需要满足的条件如下:
  • 领取了某会场的优惠券。
  • 在购物车中添加了超过3次的商品。
  • 但最后没有结账付款。
为此,我们将领取某会场的优惠券定义为StartCondition,添加商品到购物车定义为MiddleCondition,结账定义为EndCondition。抽象出的模式为在大促前10分钟的时间窗口内,满足StartCondition的事件可以发生也可以不发生,满足MiddleCondition的事件发生了大于等于3次,但最后没有1个满足EndCondition的事件。它对应的Pattern用Java代码描述如下。
Pattern<Event, Event> pattern =
    Pattern.<Event>begin("start")
            .where(new StartCondition())
            .optional()
            .followedBy("middle")
            .where(new MiddleCondition())
            .timesOrMore(3)
            .notFollowedBy("end")
            .where(new EndCondition())
            .within(Time.minutes(10));
其按本文档描述的JSON格式表达如下。
{
  "name": "end",
  "quantifier": {
    "consumingStrategy": "SKIP_TILL_NEXT",
    "properties": [
      "SINGLE"
    ],
    "times": null,
    "untilCondition": null
  },
  "condition": null,
  "nodes": [
    {
      "name": "end",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "SINGLE"
        ],
        "times": null,
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    },
    {
      "name": "middle",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "LOOPING"
        ],
        "times": {
          "from": 3,
          "to": 3,
          "windowTime": null
        },
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.MiddleCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    },
    {
      "name": "start",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "SINGLE",
          "OPTIONAL"
        ],
        "times": null,
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.StartCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    }
  ],
  "edges": [
    {
      "source": "middle",
      "target": "end",
      "type": "NOT_FOLLOW"
    },
    {
      "source": "start",
      "target": "middle",
      "type": "SKIP_TILL_NEXT"
    }
  ],
  "window": {
    "type": "FIRST_AND_LAST",
    "time": {
      "unit": "MINUTES",
      "size": 10
    }
  },
  "afterMatchStrategy": {
    "type": "NO_SKIP",
    "patternName": null
  },
  "type": "COMPOSITE",
  "version": 1
}

示例二:在Pattern中使用包含自定义参数的Condition

例如在实时营销场景中,假设我们给用户打上了一个人群标签,之后会根据用户所属的标签采取不同的营销策略,例如对于A类用户我们发送营销短信,对于B类用户我们发送优惠券等,而对于其他用户,我们不采取营销措施。针对上述需求,我们可以定义一个普通的Class类型Condition来解决,但当我们想调整策略,针对C类用户也发送优惠券时,如果使用的是普通的Class类型Condition,那么我们必须改写代码,重新编译并运行作业。这种情况下,我们可以使用包含自定义参数的Condition,在代码中定义好如何根据传入的参数进行策略的调整之后,我们只需要在数据库中修改传入的参数(即包含自定义参数的Condition的args字段的值),例如由["A", "B"] 改为["A", "B", "C"],即可实现营销策略的动态更新。

即假设初始Pattern中定义的Condition如下:
"condition": {
    "args": [
        "A", "B"
    ],
    "className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
    "type": "CLASS"
}
我们可将其修改为:
"condition": {
    "args": [
        "A", "B", "C"
    ],
    "className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
    "type": "CLASS"
}
关于该类Condition在具体业务场景的使用示例,详情请参见Demo
说明 本文中aviatorscriptDemo属于第三方搭建的网站,访问时可能会存在无法打开或访问延迟的问题。

相关文档

Flink动态CEP快速入门