全部產品
Search
文件中心

Realtime Compute for Apache Flink:動態CEP中規則的JSON格式定義

更新時間:Jul 13, 2024

本文為您介紹CEP中規則的JSON格式相關資訊。

目標人群

  • 客戶風控平台開發人員:對Flink CEP較熟悉的平台研發人員應能快速學習本格式,並根據自身平台需求判斷是否需要進一步封裝。
  • 客戶風控策略人員:只熟悉具體策略但缺乏Java經驗的同學,在熟悉CEP概念的基礎上,也可快速上手本格式的使用來編寫新規則,使其在上線的風控作業中應用。

JSON格式定義

對於一個事件序列(Event Sequence)中的模式(Pattern),我們可以將其看作一個圖(Graph),圖中節點(Node)為針對某些事件(Event)的模式,節點之間的邊(Edge)為事件選擇策略(Event Selection Strategy),即如何從一類模式的匹配轉移到另一類模式的匹配。每個圖也可以看作一個更大的圖的子節點,從而允許模式的嵌套。基於以上考慮,阿里雲Realtime ComputeFlink定義了一套基於JSON的規範來描述CEP中的規則,進而方便規則的儲存與修改,該規範中各個欄位的含義如下。
  • 節點(Node)定義
    一個節點(Node)即一個完整的模式(Pattern),它包含如下屬性。
    欄位名描述類型是否必填備忘
    namePattern名稱。string一個唯一的字串。
    說明 不同節點的名稱不能重複。
    type該Node類型。enum(string)
    • 對於包含子Pattern的節點,該欄位值為COMPOSITE。
    • 對於無子Pattern的節點,該欄位值為ATOMIC。
    quantifier量詞,用於描述如何匹配該Pattern,例如只匹配一次。dict請參見本文量詞(Quantifier)定義。
    condition條件。dict請參見本文條件(Condition)定義。
  • 量詞(Quantifier)定義
    量詞的作用是描述對於滿足該Pattern的事件要如何匹配。例如模式 "A*" 對應的量詞properties為LOOPING,該Pattern內部的事件選擇策略為SKIP_TILL_ANY。
    欄位名描述類型是否必填備忘
    consumingStrategy事件選擇策略。enum(string)僅支援以下取值:
    • STRICT
    • SKIP_TILL_NEXT
    • SKIP_TILL_ANY

    取值及含義請參見本文連續性定義。

    times用於描述該Pattern需要匹配多少次。dict
    取值樣本如下。
    "times": {
              "from": 3,
              "to": 3,
              "windowTime": {
              "unit": "MINUTES",
              "size": 12
              }
            },
    其中from和to的資料類型均為integer,windowTime的單位可以為DAYS、HOURS、MINUTES、SECONDS和MILLISECONDS。
    說明 windowTime可以設為null,即"windowTime": null
    properties描述該量詞所具有的屬性。array of enumString取值及含義請參見本文量詞屬性含義。
    untilCondition 停止條件。
    說明 僅可在LOOPING量詞修飾的Pattern後使用。
    dict 取值及含義請參見本文條件(Condition)定義。
  • 條件(Condition)定義
    條件用於篩選符合某些要求的事件。例如要篩選瀏覽時間長度超過5分鐘的客戶,瀏覽時間長度超過5分鐘即為一個條件。
    欄位名描述類型是否必填備忘
    type條件類型。enum(string)條件類型取值如下:
    • CLASS:對應使用者自訂的條件。
    • AVIATOR:對應基於AVIATOR運算式的條件。
    • GROOVY:對應基於GROOVY運算式的條件。
    ...其他可序列化的自訂欄位。...
    目前我們支援以下幾種Condition:
    • Class類型Condition
      欄位名描述類型是否必填備忘
      type條件類型。enum(string)固定值為Class。
      className類名。string該class完整類名,例如com.alibaba.ververica.cep.demo.StartCondition
    • 包含自訂參數的Condition

      使用者在使用普通的Class類型Condition時,只能傳入類名(className),而無法動態地傳入參數。在動態CEP支援中,為了提供更豐富的Condition表達能力,我們設計並實現了包含自訂參數的Condition(即CustomArgsCondition),從而允許使用者在JSON中通過字串數組來設定CustomArgsCondition所需參數, 進而動態構造CustomArgsCondition執行個體。這一特性允許使用者動態更新Condition的參數,而無需修改Java代碼。

      欄位名描述類型是否必填備忘
      type條件類型。enum(string)固定值為Class。
      className類名。string該class完整類名,例如com.alibaba.ververica.cep.demo.CustomMiddleCondition
      args自訂參數。array of string一個字串數組。
    • 基於Aviator運算式的Condition
      Aviator是一個運算式求值引擎,可以動態地將運算式編譯成位元組碼(詳情請參見aviatorscript)。因此我們可以在作業中使用基於Aviator運算式的Condition,使得條件的閾值也可以動態修改,而無需修改Java代碼重新編譯運行。
      欄位名描述類型是否必填備忘
      type類名。string固定值為AVIATOR。
      expression運算式字串。string形如price > 10這樣的運算式字串(price變數名來自於Java代碼中定義的欄位)。

      您可以將該字串在資料庫中的值進行修改。例如修改為price > 20,Flink CEP作業會動態載入price > 20構造新的AviatorCondition來處理之後的事件。

    • 基於Groovy運算式的Condition
      Groovy是一個基於JVM平台的動態語言(Groovy文法可以參見syntax)。動態CEP支援使用Groovy運算式來定義條件(Condition),從而允許動態修改條件的閾值。
      欄位名描述類型是否必填備忘
      type類名。string固定值為GROOVY。
      expression運算式字串。string形如price > 5.0 && name.contains("mid")這樣的運算式字串(pricename等變數名來自於Java代碼中定義的欄位)。您可以將該字串在資料庫中的值進行修改。例如修改為price > 20 && name.contains("end"),Flink CEP作業會動態載入新的Groovy字串並構造新的GroovyCondition來處理之後的事件。
  • 邊(Edge)定義
    欄位名描述類型是否必填備忘
    source源模式名稱。string無。
    target目標模式名稱。string無。
    type事件選擇策略。dict支援以下取值:
    • STRICT
    • SKIP_TILL_NEXT
    • SKIP_TILL_ANY
    • NOT_FOLLOW
    • NOT_NEXT

    取值及含義請參見本文連續性定義。

  • 圖(GraphNode extends Node)定義

    一個圖(GraphNode)代表一個完整的Pattern序列,它的節點(nodes)是各個獨立的Pattern,邊(edges)代表如何從一類Pattern的匹配轉移到另一類Pattern的匹配。

    為了支援Pattern的嵌套(即GroupPattern),我們將一個GraphNode看作是Node的子類,即一個GraphNode可以作為一個更大的GraphNode中的Node。GraphNode相比於基礎Node,額外多了以下2類欄位:
    • 描述圖的結構的nodes欄位與edges欄位。
    • 描述圖內時間視窗策略的window欄位與事件匹配後的跳出策略afterMatchSkipStrategy欄位。
    GraphNode的欄位詳情請參見下表。
    欄位名描述類型是否必填備忘
    name該複合Pattern名稱。String一個唯一的字串。
    說明 不同Graph名稱不能重複。
    type該Node類型。enum(string)固定值為COMPOSITE。
    version該Graph使用的JSON格式的版本號碼。Int預設值為1。
    nodes該Pattern內嵌套的子Pattern。array of Node不可以為空白的array。
    edges嵌套的子Pattern的串連關係。array of Edge可以為空白的array。
    window
    • 當類型為FIRST_AND_LAST:代表該複合Pattern一次完整匹配之間的最大時間間隔。
    • 當類型為PREVIOUS_AND_CURRENT:代表該相鄰2個子Pattern匹配之間的最大時間間隔。
    dict取值樣本如下。
    "window": {
       "type": "FIRST_AND_LAST",
       "time": {
       "unit": "DAYS",
       "size": 1
       }
    }

    單位可以為DAYS、HOURS、MINUTES、SECONDS和MILLISECONDS。資料類型為Long或Integer。

    afterMatchSkipStrategy該圖內所有事件匹配後的跳過策略。dict請參見本文事件匹配後的跳過策略(AfterMatchSkipStrategy)定義。
    quantifier量詞,用於描述如何匹配該Pattern,例如只匹配一次。dict請參見本文量詞(Quantifier)定義。
  • 事件匹配後的跳過策略(AfterMatchSkipStrategy)定義
    欄位名描述類型是否必填備忘
    type策略類型。enum(string)
    參數取值如下:
    • NO_SKIP(預設值):每個成功的匹配都會被輸出。
    • SKIP_TO_NEXT:丟棄以相同事件開始的所有部分匹配。
    • SKIP_PAST_LAST_EVENT:丟棄起始在這個匹配的開始和結束之間的所有部分匹配。
    • SKIP_TO_FIRST:丟棄起始在這個匹配的開始和第一個出現的名稱為PatternName事件之間的所有部分匹配。
    • SKIP_TO_LAST:丟棄起始在這個匹配的開始和最後一個出現的名稱為PatternName事件之間的所有部分匹配。

    詳情請參見匹配後跳過策略

    patternName策略針對的模式的名稱。string一個唯一的字串。
  • 連續性定義
    物理值含義
    STRICT嚴格連續。 所有匹配的事件中間沒有任何不匹配的事件。
    SKIP_TILL_NEXT鬆散連續。允許匹配的事件之間出現不匹配的事件,不匹配的事件會被忽略。
    SKIP_TILL_ANY不確定鬆散連續。更進一步的鬆散連續,允許忽略掉一些匹配事件的附加匹配。
    NOT_NEXT緊接著的後續事件不能是某指定事件。
    NOT_FOLLOW某指定事件後續不出現。

    相關樣本請參見事件處理(CEP)文檔。

  • 量詞屬性含義
    取值含義
    SINGLE代表該模式只出現一次。
    LOOPING代表該模式為迴圈模式,可能出現多次,類比Regex中的*與+。
    TIMES代表該模式會出現指定次數。
    GREEDY代表在匹配該模式時,會採用貪婪匹配策略,儘可能多地匹配。
    OPTIONAL代表該模式為可選模式。

樣本一:普通Pattern樣本

例如在電商大促的即時營銷情境中,要找到在大促前10分鐘時間視窗內滿足指定條件的客戶,來使用Flink 動態CEP規則針對性地調整營銷策略。這些客戶需要滿足的條件如下:
  • 領取了某會場的優惠券。
  • 在購物車中添加了超過3次的商品。
  • 但最後沒有結賬付款。
為此,我們將領取某會場的優惠券定義為StartCondition,添加商品到購物車定義為MiddleCondition,結賬定義為EndCondition。抽象出的模式為在大促前10分鐘的時間視窗內,滿足StartCondition的事件可以發生也可以不發生,滿足MiddleCondition的事件發生了大於等於3次,但最後沒有1個滿足EndCondition的事件。它對應的Pattern用Java代碼描述如下。
Pattern<Event, Event> pattern =
    Pattern.<Event>begin("start")
            .where(new StartCondition())
            .optional()
            .followedBy("middle")
            .where(new MiddleCondition())
            .timesOrMore(3)
            .notFollowedBy("end")
            .where(new EndCondition())
            .within(Time.minutes(10));
其按本文檔描述的JSON格式表達如下。
{
  "name": "end",
  "quantifier": {
    "consumingStrategy": "SKIP_TILL_NEXT",
    "properties": [
      "SINGLE"
    ],
    "times": null,
    "untilCondition": null
  },
  "condition": null,
  "nodes": [
    {
      "name": "end",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "SINGLE"
        ],
        "times": null,
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    },
    {
      "name": "middle",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "LOOPING"
        ],
        "times": {
          "from": 3,
          "to": 3,
          "windowTime": null
        },
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.MiddleCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    },
    {
      "name": "start",
      "quantifier": {
        "consumingStrategy": "SKIP_TILL_NEXT",
        "properties": [
          "SINGLE",
          "OPTIONAL"
        ],
        "times": null,
        "untilCondition": null
      },
      "condition": {
        "className": "com.alibaba.ververica.cep.demo.condition.StartCondition",
        "type": "CLASS"
      },
      "type": "ATOMIC"
    }
  ],
  "edges": [
    {
      "source": "middle",
      "target": "end",
      "type": "NOT_FOLLOW"
    },
    {
      "source": "start",
      "target": "middle",
      "type": "SKIP_TILL_NEXT"
    }
  ],
  "window": {
    "type": "FIRST_AND_LAST",
    "time": {
      "unit": "MINUTES",
      "size": 10
    }
  },
  "afterMatchStrategy": {
    "type": "NO_SKIP",
    "patternName": null
  },
  "type": "COMPOSITE",
  "version": 1
}

樣本二:在Pattern中使用包含自訂參數的Condition

例如在即時營銷情境中,假設我們給使用者打上了一個人群標籤,之後會根據使用者所屬的標籤採取不同的營銷策略,例如對於A類使用者我們發送營銷簡訊,對於B類使用者我們發送優惠券等,而對於其他使用者,我們不採取營銷措施。針對上述需求,我們可以定義一個普通的Class類型Condition來解決,但當我們想調整策略,針對C類使用者也發送優惠券時,如果使用的是普通的Class類型Condition,那麼我們必須改寫代碼,重新編譯並運行作業。這種情況下,我們可以使用包含自訂參數的Condition,在代碼中定義好如何根據傳入的參數進行策略的調整之後,我們只需要在資料庫中修改傳入的參數(即包含自訂參數的Condition的args欄位的值),例如由["A", "B"] 改為["A", "B", "C"],即可實現營銷策略的動態更新。

即假設初始Pattern中定義的Condition如下:
"condition": {
    "args": [
        "A", "B"
    ],
    "className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
    "type": "CLASS"
}
我們可將其修改為:
"condition": {
    "args": [
        "A", "B", "C"
    ],
    "className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
    "type": "CLASS"
}
關於該類Condition在具體業務情境的使用樣本,詳情請參見Demo
說明 本文中aviatorscriptDemo屬於第三方搭建的網站,訪問時可能會存在無法開啟或訪問延遲的問題。

相關文檔

Flink動態CEP快速入門