Realtime ComputeFlink版支援通過DataStream作業的方式運行支援規則動態更新的Flink CEP作業。本文結合即時營銷中的反作弊情境,為您介紹如何基於Flink全託管快速構建一個動態載入最新規則來處理上遊Kafka資料的Flink CEP作業。
背景資訊
在電商平台投放廣告時,廣告主通常有預算限制。例如對於按點擊次數計算費用的廣告而言,如果有黑灰產構造虛假流量,攻擊廣告主,則會很快消耗掉正常廣告主的預算,使得廣告內容被提前下架。在這種情況下,廣告主的利益受到了損害,容易導致後續的投訴與糾紛。
為了應對上述作弊情境,我們需要快速辨識出惡意流量,採取針對性措施(例如限制惡意使用者、向廣告主發送警示等)來保護使用者權益。同時考慮到可能有意外因素(例如達人推薦、熱時間點事件引流)導致流量驟變,我們也需要動態調整用於識別惡意流量的規則,避免損害正常使用者的利益。
本文為您示範如何使用Flink動態CEP解決上述問題。我們假設客戶的行為日誌會被存放入訊息佇列Kafka中,Flink CEP作業會消費Kafka資料,同時會去輪詢RDS資料庫中的規則表,拉取策略人員添加到資料庫的最新規則,並用最新規則去匹配事件。針對匹配到的事件,Flink CEP作業會發出警示或將相關資訊寫入其他資料存放區中。樣本中整體資料鏈路如下圖所示。
實際示範中,我們會先啟動Flink CEP作業,然後插入規則1:連續3條action為0的事件發生後,下一條事件的action仍非1,其業務含義為連續3次訪問該產品後最後沒有購買。在匹配到相應事件並進行處理後,我們會動態更新規則1內容為連續5條action為0或2的事件發生後,下一條事件的action仍非1,來應對流量整體增加的情境,同時插入一條規則2,它將和規則1的初始規則一樣,用於輔助展示多規則支援等功能。當然,您也可以添加一個全新規則。
前提條件
如果您使用RAM使用者或RAM角色等身份訪問,需要確認已具有Flink控制台相關許可權,詳情請參見許可權管理。
已建立Flink工作空間,詳情請參見開通Realtime ComputeFlink版。
上下遊儲存
已建立RDS MySQL執行個體,詳情請參見建立RDS MySQL執行個體。
已建立訊息佇列Kafka執行個體,詳情請參見概述。
僅Realtime Compute引擎VVR 6.0.2及以上版本支援動態CEP功能。
操作流程
本文為您介紹如何編寫Flink CEP作業檢測行為日誌中的例外狀況事件序列去發現惡意流量,並示範如何進行規則的動態更新。具體的操作流程如下:
步驟一:準備測試資料
準備上遊Kafka Topic
建立一個名稱為demo_topic的Topic,存放類比的使用者行為日誌。
操作詳情請參見步驟一:建立Topic。
準備RDS資料庫
在Data Management控制台上,準備RDS MySQL的測試資料。
使用高許可權帳號登入RDS MySQL。
建立rds_demo規則表,用來記錄Flink CEP作業中需要應用的規則。
在已登入的SQLConsole視窗,輸入如下命令後,單擊執行。
CREATE DATABASE cep_demo_db; USE cep_demo_db; CREATE TABLE rds_demo ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) );
每行代表一條規則,包含id、version等用於區分不同規則與每個規則不同版本的欄位、描述CEP API中的模式對象的pattern欄位,以及描述如何處理匹配模式的事件序列的function欄位。
步驟二:配置IP白名單
為了讓Flink能訪問RDS MySQL執行個體,您需要將Flink全託管工作空間的網段添加到在RDS MySQL的白名單中。
擷取Flink全託管工作空間的VPC網段。
在目標工作空間右側操作列,選擇 。
在工作空間詳情對話方塊,查看Flink全託管虛擬交換器的網段資訊。
在RDS MySQL的IP白名單中,添加Flink全託管網段資訊。
操作步驟詳情請參見設定IP白名單。
步驟三:開發並啟動Flink CEP作業
本文中所有代碼都可以在Github倉庫下載。本文檔接下來會描述重點部分實現,方便您參考。
配置Maven專案中的pom.xml檔案所使用的倉庫。
pom.xml檔案的配置詳情,請參見Kafka DataStream Connector。
在作業的Maven POM檔案中添加flink-cep作為專案依賴。
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-cep</artifactId> <version>1.15-vvr-6.0.2-api</version> <scope>provided</scope> </dependency>
開發作業代碼。
構建Kafka Source。
代碼編寫詳情,請參見Kafka DataStream Connector。
構建CEP.dynamicPatterns()。
為支援CEP規則動態變更與多規則匹配,阿里雲Realtime ComputeFlink版定義了CEP.dynamicPatterns() API。該API定義代碼如下。
public static <T, R> SingleOutputStreamOperator<R> dynamicPatterns( DataStream<T> input, PatternProcessorDiscovererFactory<T> discovererFactory, TimeBehaviour timeBehaviour, TypeInformation<R> outTypeInfo)
使用該API時,所需參數如下。您可以跟進實際使用方式,更新相應的參數取值。
參數
說明
DataStream<T> input
輸入事件流。
PatternProcessorDiscovererFactory<T> discovererFactory
工廠對象。工廠對象負責構造一個探查器(PatternProcessorDiscoverer),探查器負責擷取最新規則,即構造一個PatternProcessor介面。
TimeBehaviour timeBehaviour
描述Flink CEP作業如何處理事件的時間屬性。參數取值如下:
TimeBehaviour.ProcessingTime:代表按照Processing Time處理事件。
TimeBehaviour.EventTime:代表按照Event Time處理事件。
TypeInformation<R> outTypeInfo
描述輸出資料流的類型資訊。
關於DataStream、TimeBehaviour、TypeInformation等Flink作業常見概念詳情,請參見DataStream、TimeBehaviour和TypeInformation。
這裡重點介紹PatternProcessor介面,一個PatternProcessor包含一個確定的模式(Pattern)用於描述如何去匹配事件,以及一個PatternProcessFunction用於描述如何處理一個匹配(例如發送警報)。除此之外,還包含id與version等用於標識PatternProcessor的資訊。因此一個PatternProcessor既包含規則本身,又指明了規則引發時,Flink作業應如何響應。更多背景請參見提案。
而patternProcessorDiscovererFactory用於構造一個探查器去擷取最新的PatternProcessor,我們在範例程式碼中提供了一個預設的周期性掃描外部儲存的抽象類別。它描述了如何啟動一個Timer去定時輪詢外部儲存拉取最新的PatternProcessor。
public abstract class PeriodicPatternProcessorDiscoverer<T> implements PatternProcessorDiscoverer<T> { ... @Override public void discoverPatternProcessorUpdates( PatternProcessorManager<T> patternProcessorManager) { // Periodically discovers the pattern processor updates. timer.schedule( new TimerTask() { @Override public void run() { if (arePatternProcessorsUpdated()) { List<PatternProcessor<T>> patternProcessors = null; try { patternProcessors = getLatestPatternProcessors(); } catch (Exception e) { e.printStackTrace(); } patternProcessorManager.onPatternProcessorsUpdated(patternProcessors); } } }, 0, intervalMillis); } ... }
Realtime ComputeFlink版提供了JDBCPeriodicPatternProcessorDiscoverer的實現,用於從支援JDBC協議的資料庫(例如RDS或者Hologres等)中拉取最新的規則。在使用時,您需要指定如下參數。
參數
說明
jdbcUrl
資料庫JDBC串連地址。
jdbcDriver
資料庫驅動類類名。
tableName
資料庫表名。
initialPatternProcessors
當資料庫的規則表為空白時,使用的預設PatternProcessor。
intervalMillis
輪詢資料庫的時間間隔。
在實際代碼中您可以按如下方式使用,作業將會匹配到的規則列印到Flink TaskManager的輸出中。
// import ...... public class CepDemo { public static void main(String[] args) throws Exception { ...... // DataStream Source DataStreamSource<Event> source = env.fromSource( kafkaSource, WatermarkStrategy.<Event>forMonotonousTimestamps() .withTimestampAssigner((event, ts) -> event.getEventTime()), "Kafka Source"); env.setParallelism(1); // keyBy userId and productionId // Notes, only events with the same key will be processd to see if there is a match KeyedStream<Event, Tuple2<Integer, Integer>> keyedStream = source.keyBy( new KeySelector<Event, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> getKey(Event value) throws Exception { return Tuple2.of(value.getId(), value.getProductionId()); } }); SingleOutputStreamOperator<String> output = CEP.dynamicPatterns( keyedStream, new JDBCPeriodicPatternProcessorDiscovererFactory<>( JDBC_URL, JDBC_DRIVE, TABLE_NAME, null, JDBC_INTERVAL_MILLIS), TimeBehaviour.ProcessingTime, TypeInformation.of(new TypeHint<String>() {})); output.print(); // Compile and submit the job env.execute("CEPDemo"); } }
說明為了方便示範,我們在Demo代碼裡將輸入資料流按照id和product id做了一步keyBy,再與
CEP.dynamicPatterns()
串連使用。這意味著只有具有相同id和product id的事件會被納入到規則匹配的考慮中,Key不同的事件之間不會產生匹配。
在Realtime Compute控制台上,上傳JAR包並部署JAR作業,具體操作詳情請參見部署作業。
為了讓您可以快速測試使用,您需要下載Realtime ComputeFlink版測試JAR包。部署時需要配置的參數填寫說明如下表所示。
說明由於目前我們上遊的Kafka Source暫無資料,並且資料庫中的規則表為空白。因此作業運行起來之後,暫時會沒有輸出。
配置項
說明
部署模式
選擇為流模式。
部署名稱
填寫對應的JAR作業名稱。
引擎版本
從VVR 3.0.3版本(對應Flink 1.12版本)開始,VVP支援同時運行多個不同引擎版本的JAR作業。如果您的作業已使用了Flink 1.12及更早版本的引擎,您需要按照以下情況進行處理:
Flink 1.12版本:停止後啟動作業,系統將自動將引擎升級為vvr-3.0.3-flink-1.12版本。
Flink 1.11或Flink 1.10版本:手動將作業引擎版本升級到vvr-3.0.3-flink-1.12或vvr-4.0.8-flink-1.13版本後重啟作業,否則會在啟動作業時逾時報錯。
JAR URL
上傳打包好的JAR包,或者直接上傳我們提供的測試JAR包。
Entry Point Class
填寫為
com.alibaba.ververica.cep.demo.CepDemo
。Entry Point Main Arguments
如果您是自己開發的作業,已經配置了相關上下遊儲存的資訊,則此處可以不填寫。但是,如果您是使用的我們提供的測試JAR包,則需要配置該參數。代碼資訊如下。
--kafkaBrokers YOUR_KAFKA_BROKERS --inputTopic YOUR_KAFKA_TOPIC --inputTopicGroup YOUR_KAFKA_TOPIC_GROUP --jdbcUrl jdbc:mysql://YOUR_DB_URL:port/DATABASE_NAME?user=YOUR_USERNAME&password=YOUR_PASSWORD --tableName YOUR_TABLE_NAME --jdbcIntervalMs 3000
其中涉及的參數及含義如下:
kafkaBrokers:Kafka Broker地址。
inputTopic:Kafka Topic名稱。
inputTopicGroup:Kafka消費組。
jdbcUrl:資料庫JDBC串連地址。
說明本樣本所使用的JDBC URL中對應的帳號和密碼需要為普通帳號和密碼,且密碼裡僅支援英文字母和數字。在實際情境中,您可根據您的需求在作業中使用不同的鑒權方式。
tableName:目標表名稱。
jdbcIntervalMs:輪詢資料庫的時間間隔。
說明需要將以上參數的取值修改為您實際業務上下遊儲存的資訊。
參數資訊長度不要大於1024,且不建議用來傳複雜參數,複雜參數指包括了換行、空格或者其他特殊字元的參數(僅支援英文字母和數字)。如果您需要傳入複雜參數,請使用附加依賴檔案來傳輸。
在部署詳情頁簽中的其他配置,添加如下作業運行參數。
kubernetes.application-mode.classpath.include-user-jar: 'true' classloader.resolve-order: parent-first
運行參數配置步驟詳情請參見運行參數配置。
在 頁面,單擊目標作業操作列下的啟動。
作業啟動參數配置詳情請參見作業啟動。
步驟四:插入規則
啟動Flink CEP作業,然後插入規則1:連續3條action為0的事件發生後,下一條事件的action仍非1,其業務含義為連續3次訪問該產品後最後沒有購買。
使用普通帳號登入RDS MySQL。
插入動態更新規則。
將JSON字串與id、version、function類名等拼接後插入到RDS中。
INSERT INTO rds_demo ( `id`, `version`, `pattern`, `function` ) values( '1', 1, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}', 'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction') ;
為了方便您使用並提高資料庫中的Pattern欄位的可讀性,Realtime ComputeFlink版定義了一套JSON格式的規則描述,詳情請參見動態CEP中規則的JSON格式定義。上述SQL語句中的pattern欄位的值就是按照JSON格式的規則,給出的序列化後的pattern字串。它的物理意義是去匹配這樣的模式:連續3條action為0的事件發生後,下一條事件的action仍非1。
說明在下文的EndCondition對應的代碼中,定義了action仍非1。
對應的CEP API描述如下。
Pattern<Event, Event> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent()) .where(new StartCondition("action == 0")) .timesOrMore(3) .followedBy("end") .where(new EndCondition());
對應的JSON字串如下。
{ "name": "end", "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": [ "SINGLE" ], "times": null, "untilCondition": null }, "condition": null, "nodes": [ { "name": "end", "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": [ "SINGLE" ], "times": null, "untilCondition": null }, "condition": { "className": "com.alibaba.ververica.cep.demo.condition.EndCondition", "type": "CLASS" }, "type": "ATOMIC" }, { "name": "start", "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": [ "LOOPING" ], "times": { "from": 3, "to": 3, "windowTime": null }, "untilCondition": null }, "condition": { "expression": "action == 0", "type": "AVIATOR" }, "type": "ATOMIC" } ], "edges": [ { "source": "start", "target": "end", "type": "SKIP_TILL_NEXT" } ], "window": null, "afterMatchStrategy": { "type": "SKIP_PAST_LAST_EVENT", "patternName": null }, "type": "COMPOSITE", "version": 1 }
通過Kafka Client向demo_topic中發送訊息。
在本Demo中,您也可以使用訊息佇列Kafka提供的快速體驗訊息收發頁面發送測試訊息。
1,Ken,0,1,1662022777000 1,Ken,0,1,1662022778000 1,Ken,0,1,1662022779000 1,Ken,0,1,1662022780000
demo_topic欄位說明如下表所示。
欄位
說明
id
使用者ID。
username
使用者名稱。
action
使用者動作,取值如下:
0代表瀏覽操作。
1代表購買動作。
2代表分享操作。
product_id
商品ID。
event_time
該行為發生的事件時間。
查看JobManager日誌中列印的最新規則和TaskManager日誌中列印的匹配。
在JobManager日誌中,通過JDBCPeriodicPatternProcessorDiscoverer關鍵詞搜尋,查看最新規則。
在TaskManager中以.out結尾的記錄檔中,通過
A match for Pattern of (id, version): (1, 1)
關鍵詞搜尋,查看日誌中列印的匹配。
步驟五:更新匹配規則,並查看更新的規則是否生效
在匹配到相應事件並進行處理後,動態更新規則1內容為連續5條action為0或為2的事件發生後,下一條事件的action仍非1,來應對流量整體增加的情境,同時插入一條規則2,它將和規則1的初始規則一樣,用於輔助展示多規則支援等功能。
使用在RDS控制台上,更新匹配規則。
使用普通帳號登入RDS MySQL。
將StartCondition中的action == 0修改為action == 0 || action == 2,並且我們將重複出現的次數從>=3改為>=5,對應SQL語句如下。
INSERT INTO rds_demo(`id`, `version`, `pattern`, `function`) values('1', 2, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":5,"to":5,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0 || action == 2","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction');
再插入一條記錄的id為2新規則。
它和規則1的版本1一樣,其StartCondition仍為action == 0且重複出現的次數為>=3。
INSERT INTO rds_demo(`id`, `version`, `pattern`, `function`) values('2', 1, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction');
在Kafka控制台上發送8條簡單的訊息,來觸發匹配。
8條簡單的訊息樣本如下。
1,Ken,0,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,2,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,2,1,1662022777000
在TaskManager中以.out結尾的記錄檔中,查看匹配結果。
如果要搜規則1在更新為版本2之後的匹配,可以通過
A match for Pattern of (id, version): (1, 2)
關鍵詞,查匹配結果。如果要搜規則2在版本為1的匹配,可以通過
A match for Pattern of (id, version): (2, 1)
關鍵詞,查匹配結果。
如上圖中藍框內結果所示,Flink CEP作業按照id為1,version為2的規則匹配到1次5個action為0或2的事件+1個action非1的1個事件的事件序列後輸出結果,代表動態修改的規則成功生效;而對於id為2,version為1的規則,如上圖中橙色框內結果所示,Flink CEP作業匹配到2次3個action為0的事件+1個action非1的1個事件的事件序列後輸出結果,代表動態新增的規則也在作業中被採用。