目標人群
- 客戶風控平台開發人員:對Flink CEP較熟悉的平台研發人員應能快速學習本格式,並根據自身平台需求判斷是否需要進一步封裝。
- 客戶風控策略人員:只熟悉具體策略但缺乏Java經驗的同學,在熟悉CEP概念的基礎上,也可快速上手本格式的使用來編寫新規則,使其在上線的風控作業中應用。
JSON格式定義
對於一個事件序列(Event Sequence)中的模式(Pattern),我們可以將其看作一個圖(Graph),圖中節點(Node)為針對某些事件(Event)的模式,節點之間的邊(Edge)為事件選擇策略(Event Selection Strategy),即如何從一類模式的匹配轉移到另一類模式的匹配。每個圖也可以看作一個更大的圖的子節點,從而允許模式的嵌套。基於以上考慮,阿里雲Realtime ComputeFlink定義了一套基於JSON的規範來描述CEP中的規則,進而方便規則的儲存與修改,該規範中各個欄位的含義如下。
樣本一:普通Pattern樣本
例如在電商大促的即時營銷情境中,要找到在大促前10分鐘時間視窗內滿足指定條件的客戶,來使用Flink 動態CEP規則針對性地調整營銷策略。這些客戶需要滿足的條件如下:
- 領取了某會場的優惠券。
- 在購物車中添加了超過3次的商品。
- 但最後沒有結賬付款。
為此,我們將領取某會場的優惠券定義為StartCondition,添加商品到購物車定義為MiddleCondition,結賬定義為EndCondition。抽象出的模式為在大促前10分鐘的時間視窗內,滿足StartCondition的事件可以發生也可以不發生,滿足MiddleCondition的事件發生了大於等於3次,但最後沒有1個滿足EndCondition的事件。它對應的Pattern用Java代碼描述如下。
Pattern<Event, Event> pattern =
Pattern.<Event>begin("start")
.where(new StartCondition())
.optional()
.followedBy("middle")
.where(new MiddleCondition())
.timesOrMore(3)
.notFollowedBy("end")
.where(new EndCondition())
.within(Time.minutes(10));
其按本文檔描述的JSON格式表達如下。
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": null,
"nodes": [
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "middle",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"LOOPING"
],
"times": {
"from": 3,
"to": 3,
"windowTime": null
},
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.MiddleCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "start",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE",
"OPTIONAL"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.StartCondition",
"type": "CLASS"
},
"type": "ATOMIC"
}
],
"edges": [
{
"source": "middle",
"target": "end",
"type": "NOT_FOLLOW"
},
{
"source": "start",
"target": "middle",
"type": "SKIP_TILL_NEXT"
}
],
"window": {
"type": "FIRST_AND_LAST",
"time": {
"unit": "MINUTES",
"size": 10
}
},
"afterMatchStrategy": {
"type": "NO_SKIP",
"patternName": null
},
"type": "COMPOSITE",
"version": 1
}
樣本二:在Pattern中使用包含自訂參數的Condition
例如在即時營銷情境中,假設我們給使用者打上了一個人群標籤,之後會根據使用者所屬的標籤採取不同的營銷策略,例如對於A類使用者我們發送營銷簡訊,對於B類使用者我們發送優惠券等,而對於其他使用者,我們不採取營銷措施。針對上述需求,我們可以定義一個普通的Class類型Condition來解決,但當我們想調整策略,針對C類使用者也發送優惠券時,如果使用的是普通的Class類型Condition,那麼我們必須改寫代碼,重新編譯並運行作業。這種情況下,我們可以使用包含自訂參數的Condition,在代碼中定義好如何根據傳入的參數進行策略的調整之後,我們只需要在資料庫中修改傳入的參數(即包含自訂參數的Condition的args欄位的值),例如由["A", "B"] 改為["A", "B", "C"],即可實現營銷策略的動態更新。
即假設初始Pattern中定義的Condition如下:
"condition": {
"args": [
"A", "B"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}
我們可將其修改為:
"condition": {
"args": [
"A", "B", "C"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}
關於該類Condition在具體業務情境的使用樣本,詳情請參見
Demo。