Realtime Compute for Apache Flink allows you to run Flink complex event processing (CEP) deployments that support dynamic rule updates by using DataStream tasks. This topic describes how to develop a Flink CEP deployment that dynamically loads the latest rules to process upstream Kafka data based on fully managed Flink.
Background information
Advertisers usually have budgets when they place advertisements on e-commerce platforms. For example, an advertiser places an advertisement that is charged based on the number of clicks on the advertisement. If the advertiser is attacked by false traffic from online fraud, the budget will quickly be consumed and the advertisement will be removed before the expected date. In this case, the interests of the advertiser are harmed, which may lead to subsequent complaints and disputes.
To deal with the preceding attack scenarios, you need to quickly identify malicious traffic and take effective measures, such as restricting malicious users and sending alert notifications to advertisers, to protect the rights and interests of users. In addition, unexpected factors, such as celebrity recommendations and trending topics, may cause sudden traffic changes. In this case, you also need to dynamically adjust the rules used to identify malicious traffic to avoid harming the interests of normal users.
This topic describes how to use dynamic Flink CEP to resolve the preceding issues. In this example, the behavior logs of users are stored in an Apsara MQ for Kafka instance. A Flink CEP deployment is developed to consume Kafka data. To do this, the Flink CEP deployment polls the rule table in an ApsaraDB RDS for MySQL database, pulls the latest rules that are added to the database by policy developers, and then uses the latest rules to match events. The Flink CEP deployment sends alert notifications or writes related information to other data stores based on the matched events. The following figure shows the overall data link in the example.
In this example, a Flink CEP deployment is started, and then Rule 1 is added: If three consecutive events with the action value of 0 occur and the action value of the next event is still not 1, the business meaning of these events is that no purchase is made after three consecutive visits to a product. After events are matched based on Rule 1 and processed, the pattern of Rule 1 is modified to: Five consecutive events with the action value of 0 or 2 occur and the action value of the next event is still not 1. This is used to deal with an increase in traffic. Rule 2 whose pattern is the same as the initial pattern of Rule 1 is added to help verify features such as support for multiple rules. You can also add another rule.
Prerequisites
If you want to use a RAM user or RAM role to access the console of fully managed Flink, make sure that the RAM user or RAM role has the required permissions. For more information, see Permission management.
A workspace is created. For more information, see Activate Realtime Compute for Apache Flink.
Upstream and downstream storage instances are created.
An ApsaraDB RDS for MySQL instance is created. For more information, see Create an ApsaraDB RDS for MySQL instance.
An Apsara MQ for Kafka instance is created. For more information, see Overview.
Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.2 or later supports the dynamic Flink CEP feature.
Procedure
This topic shows you how to develop a Flink CEP deployment to detect malicious traffic from anomalous events in behavior logs and how to dynamically update rules. You can perform the following steps to use the dynamic Flink CEP feature:
Step 1: Prepare test data
Create an upstream Kafka topic
Log on to the ApsaraMQ for Kafka console.
Create a topic named demo_topic to store simulated user behavior logs.
For more information, see Step 1: Create a topic.
Prepare an ApsaraDB RDS for MySQL database
In the Data Management (DMS) console, prepare test data in an ApsaraDB RDS for MySQL database.
Log on to the ApsaraDB RDS for MySQL instance by using a privileged account.
For more information, see Use DMS to log on to an ApsaraDB RDS for MySQL instance.
Create a rule table named rds_demo to record the rules that you want to use in your Flink CEP deployment.
On the SQLConsole tab, enter the following statements and click Execute:
CREATE DATABASE cep_demo_db; USE cep_demo_db; CREATE TABLE rds_demo ( `id` VARCHAR(64), `version` INT, `pattern` VARCHAR(4096), `function` VARCHAR(512) );
Each row of the rule table represents a rule, including the id and version fields that indicate the unique ID and version of the rule, the pattern field that is used to describe the pattern object defined by Flink CEP API, and the function field that is used to describe how to process a sequence of events that match the pattern.
Step 2: Configure an IP address whitelist
To enable Flink to access the ApsaraDB RDS for MySQL instance, you must add the CIDR block of the fully managed Flink workspace to the IP address whitelist of the ApsaraDB RDS for MySQL instance.
Obtain the CIDR block of the vSwitch to which the fully managed Flink workspace belongs.
Log on to the Realtime Compute for Apache Flink console.
On the Fully Managed Flink tab, find the desired workspace and choose in the Actions column.
In the Workspace Details dialog box, view the CIDR block about the vSwitch of fully managed Flink.
Add the CIDR block of the vSwitch to which the fully managed Flink workspace is connected to the IP address whitelist of the ApsaraDB RDS for MySQL instance.
For more information, see Configure an IP address whitelist for an ApsaraDB RDS for MySQL instance.
Step 3: Develop a Flink CEP draft and start the deployment for the draft
All the code in this topic can be downloaded from the GitHub repository. The following section of this topic describes how to develop a Flink CEP draft and start the deployment for the draft for your reference.
Configure the repositories that need to be used by the pom.xml file in a Maven project.
For more information about how to configure the pom.xml file, see Kafka DataStream Connector.
Add flink-cep as a project dependency to the pom.xml file for your Flink CEP draft.
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-cep</artifactId> <version>1.15-vvr-6.0.2-api</version> <scope>provided</scope> </dependency>
Develop the code of a Flink CEP draft.
Create a Kafka source.
For more information about how to use code to create a Kafka source, see Kafka DataStream Connector.
Configure the CEP.dynamicPatterns() method.
To support dynamic changes of CEP rules and matching based on multiple rules, Alibaba Cloud Realtime Compute for Apache Flink defines the CEP.dynamicPatterns() method. The following code shows the syntax of this method:
public static <T, R> SingleOutputStreamOperator<R> dynamicPatterns( DataStream<T> input, PatternProcessorDiscovererFactory<T> discovererFactory, TimeBehaviour timeBehaviour, TypeInformation<R> outTypeInfo)
The following table describes the parameters that are required when you call this method. You can configure and modify the parameters based on your business requirements.
Parameter
Description
DataStream<T> input
The input event stream.
PatternProcessorDiscovererFactory<T> discovererFactory
The factory object. The factory object constructs a PatternProcessorDiscoverer interface that is used to obtain the most recent rules. In this case, a PatternProcessor interface is constructed.
TimeBehaviour timeBehaviour
The time attribute that defines how the Flink CEP deployment processes events. Valid values:
TimeBehaviour.ProcessingTime: specifies that events are processed based on the processing time.
TimeBehaviour.EventTime: specifies that events are processed based on the event time.
TypeInformation<R> outTypeInfo
The type information of the output stream.
For more information about the common concepts of Flink deployments, such as DataStream, TimeBehaviour, and TypeInformation, see Flink DataStream API Programming Guide, Notions of Time: Event Time and Processing Time, and Class TypeInformation<T>.
This section describes the PatternProcessor interface. A PatternProcessor interface contains a Pattern method that is used to define how events match a pattern and a PatternProcessFunction method that is used to define how to process the matched events, such as sending alert notifications. The interface also includes fields such as id and version to identify the constructed pattern processor. A PatternProcessor interface defines a rule and specifies how the Flink CEP deployment responds when the rule is triggered. For more information, see FLIP-200: Support Multiple Rule and Dynamic Rule Changing (Flink CEP).
The PatternProcessorDiscovererFactory interface is used to construct a PatternProcessorDiscoverer interface to obtain pattern processor updates. In this example, a default abstract class is used to periodically scan external storage. The following sample code shows how to start a timer to regularly poll external storage to pull pattern processor updates:
public abstract class PeriodicPatternProcessorDiscoverer<T> implements PatternProcessorDiscoverer<T> { ... @Override public void discoverPatternProcessorUpdates( PatternProcessorManager<T> patternProcessorManager) { // Periodically discovers the pattern processor updates. timer.schedule( new TimerTask() { @Override public void run() { if (arePatternProcessorsUpdated()) { List<PatternProcessor<T>> patternProcessors = null; try { patternProcessors = getLatestPatternProcessors(); } catch (Exception e) { e.printStackTrace(); } patternProcessorManager.onPatternProcessorsUpdated(patternProcessors); } } }, 0, intervalMillis); } ... }
Realtime Compute for Apache Flink provides a JDBCPeriodicPatternProcessorDiscoverer interface to obtain the most recent rules for pulling data from a database that supports the Java Database Connectivity (JDBC) protocol, such as an ApsaraDB RDS database or a Hologres database. The following table describes the parameters that are required if you use this interface.
Parameter
Description
jdbcUrl
The JDBC URL that you can use to connect to the database.
jdbcDriver
The name of the database driver class.
tableName
The name of the table in the database.
initialPatternProcessors
The pattern processor updates that are pulled from the database. If the rule table in the database is empty, the default pattern processor is used.
intervalMillis
The interval at which the database is polled.
The following sample code provides an example. In this example, the events matched by the Flink CEP deployment are displayed in the TaskManager logs of Flink.
// import ...... public class CepDemo { public static void main(String[] args) throws Exception { ...... // DataStream Source DataStreamSource<Event> source = env.fromSource( kafkaSource, WatermarkStrategy.<Event>forMonotonousTimestamps() .withTimestampAssigner((event, ts) -> event.getEventTime()), "Kafka Source"); env.setParallelism(1); // keyBy userId and productionId // Notes, only events with the same key will be processd to see if there is a match KeyedStream<Event, Tuple2<Integer, Integer>> keyedStream = source.keyBy( new KeySelector<Event, Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> getKey(Event value) throws Exception { return Tuple2.of(value.getId(), value.getProductionId()); } }); SingleOutputStreamOperator<String> output = CEP.dynamicPatterns( keyedStream, new JDBCPeriodicPatternProcessorDiscovererFactory<>( JDBC_URL, JDBC_DRIVE, TABLE_NAME, null, JDBC_INTERVAL_MILLIS), TimeBehaviour.ProcessingTime, TypeInformation.of(new TypeHint<String>() {})); output.print(); // Compile and submit the job env.execute("CEPDemo"); } }
NoteIn the demo code, the
CEP.dynamicPatterns()
method is added to split input data streams into different partitions based on the userId and productionId parameters. This way, only events that have the same values of the userId and productionId parameters are matched based on the rules. Events that have different key values are not matched.
Upload a JAR file and create a JAR deployment in the console of fully managed Flink. For more information, see Create a deployment.
You must download a test JAR file for subsequent operations. The following table describes the parameters that you need to configure when you create a deployment.
NoteIn this example, no data is stored in the upstream Kafka source, and the rule table in the database is empty. Therefore, no output is returned after you run the deployment.
Parameter
Description
Deployment Type
Select JAR.
Deployment Mode
Select Stream Mode.
Deployment Name
Enter the name of the JAR deployment that you want to create.
Engine Version
In VVR 3.0.3 and later, Ververica Platform (VVP) allows you to run SQL deployments that use different engine versions at the same time. The version of the Flink engine that uses VVR 3.0.3 is Flink 1.12. If the engine version of your deployment is Flink 1.12 or earlier, you can perform the following operations to update the engine version based on the engine version that your deployment uses:
Flink 1.12: Stop and then restart your deployment. Then, the system automatically updates the engine version of your deployment to vvr-3.0.3-flink-1.12.
Flink 1.11 or Flink 1.10: Manually update the engine version of your deployment to vvr-3.0.3-flink-1.12 or vvr-4.0.8-flink-1.13, and then restart the deployment. Otherwise, a timeout error occurs when you start the deployment.
JAR URL
Upload your JAR file or upload the test JAR file.
Entry Point Class
Set the value to
com.alibaba.ververica.cep.demo.CepDemo
.Entry Point Main Arguments
If the upstream and downstream storage information is configured in your own JAR file, you do not need to configure this parameter. However, if you upload the test JAR file, you must configure this parameter. Sample code:
--kafkaBrokers YOUR_KAFKA_BROKERS --inputTopic YOUR_KAFKA_TOPIC --inputTopicGroup YOUR_KAFKA_TOPIC_GROUP --jdbcUrl jdbc:mysql://YOUR_DB_URL:port/DATABASE_NAME?user=YOUR_USERNAME&password=YOUR_PASSWORD --tableName YOUR_TABLE_NAME --jdbcIntervalMs 3000
Parameters:
kafkaBrokers: the addresses of Kafka brokers.
inputTopic: the name of the Kafka topic.
inputTopicGroup: the consumer group of Kafka.
jdbcUrl: the JDBC URL of the database.
NoteThe username and password of the account that is used in the JDBC URL must be a standard username and password. The password can contain only letters and digits. You can use an authentication method for your deployment based on your business requirements.
tableName: the name of the destination table.
jdbcIntervalMs: the interval at which the database is polled.
NoteYou must change the values of the preceding parameters based on the information about the upstream and downstream storage instances.
A parameter value cannot be greater than 1024 characters in length. We recommend that you do not use complex parameters. A parameter whose value includes line breaks, spaces, or other special characters is considered a complex parameter. A parameter value can contain only letters and digits. If you want to pass complex parameters, use a dependency file.
In the Parameters section of the Configuration tab on the Deployments page, enter the following code in the Other Configuration field:
kubernetes.application-mode.classpath.include-user-jar: 'true' classloader.resolve-order: parent-first
For more information about how to configure the parameters in the Parameters section, see Parameters.
On the page in the console of fully managed Flink, find the desired deployment and click Start in the Actions column.
For more information about how to configure deployment startup parameters, see Start a deployment.
Step 4: Add a rule
After the Flink CEP deployment is started, add version 1 of Rule 1: If three consecutive events with the action value of 0 occur and the action value of the next event is still not 1, the business meaning of these events is that no purchase is made after three consecutive visits to a product.
Log on to the ApsaraDB RDS for MySQL instance by using a standard account.
For more information, see Use DMS to log on to an ApsaraDB RDS for MySQL instance.
Add a rule that is dynamically updated.
Merge the JSON string that defines a rule with the field names such as id, version, and function, and then execute the INSERT INTO statement to insert data into the rule table in the ApsaraDB RDS for MySQL database.
INSERT INTO rds_demo ( `id`, `version`, `pattern`, `function` ) values( '1', 1, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}', 'com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction') ;
To improve the readability of the pattern field in a database for your convenience, Realtime Compute for Apache Flink defines a set of rule descriptions in the JSON format. For more information, see Definitions of rules in the JSON format in dynamic Flink CEP. The value of the pattern field in the preceding SQL statement is a serialized pattern string of a rule in the JSON format. The physical meaning of the string is to match such a pattern: Three consecutive events with the action value of 0 occur and the action value of the next event is still not 1.
NoteThe EndCondition parameter in the following code defines that the action value of the next event is still not 1.
The following code shows the syntax of the Pattern method:
Pattern<Event, Event> pattern = Pattern.<Event>begin("start", AfterMatchSkipStrategy.skipPastLastEvent()) .where(new StartCondition("action == 0")) .timesOrMore(3) .followedBy("end") .where(new EndCondition());
The following sample code provides an example of the JSON string that defines a rule:
{ "name": "end", "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": [ "SINGLE" ], "times": null, "untilCondition": null }, "condition": null, "nodes": [ { "name": "end", "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": [ "SINGLE" ], "times": null, "untilCondition": null }, "condition": { "className": "com.alibaba.ververica.cep.demo.condition.EndCondition", "type": "CLASS" }, "type": "ATOMIC" }, { "name": "start", "quantifier": { "consumingStrategy": "SKIP_TILL_NEXT", "properties": [ "LOOPING" ], "times": { "from": 3, "to": 3, "windowTime": null }, "untilCondition": null }, "condition": { "expression": "action == 0", "type": "AVIATOR" }, "type": "ATOMIC" } ], "edges": [ { "source": "start", "target": "end", "type": "SKIP_TILL_NEXT" } ], "window": null, "afterMatchStrategy": { "type": "SKIP_PAST_LAST_EVENT", "patternName": null }, "type": "COMPOSITE", "version": 1 }
Use a Kafka client to send messages to the demo_topic topic.
In this demo, you can also send test messages in the Start to Send and Consume Message panel of the demo_topic topic in the Apsara MQ for Kafka console.
1,Ken,0,1,1662022777000 1,Ken,0,1,1662022778000 1,Ken,0,1,1662022779000 1,Ken,0,1,1662022780000
The following table describes the fields of messages in the demo_topic topic.
Field
Description
id
The ID of the user.
username
The name of the user.
action
The action of the user. Valid values:
0: the view operation.
1: the purchase operation.
2: the share operation.
product_id
The ID of the product.
event_time
The event time when the action was performed.
View the most recent rule that is displayed in the JobManager logs and the matching results displayed in the TaskManager logs.
On the Logs tab of the Job Manager tab, use the keyword JDBCPeriodicPatternProcessorDiscoverer to search for the most recent rule.
On the Log List tab of the Running Task Managers tab, find the log file whose name ends with .out and use the
A match for Pattern of (id, version): (1, 1)
keyword to search for the matching results.
Step 5: Update the matching rules and check whether the updated rules take effect
After events are matched based on Rule 1 and processed, update the pattern of Rule 1 to: Five consecutive events with the action value of 0 or 2 occur and the action value of the next event is still not 1. This is used to deal with an increase in traffic. Add Rule 2 whose pattern is the same as the initial pattern of Rule 1 to help verify features such as support for multiple rules.
Update the matching rules in the DMS console.
Log on to the ApsaraDB RDS for MySQL instance by using a standard account.
For more information, see Use DMS to log on to an ApsaraDB RDS for MySQL instance.
Change action == 0 in the value of the StartCondition parameter to action == 0 || action == 2, and change the two values of the times parameter from 3 to 5. This is version 2 of Rule 1. The following SQL statement provides an example.
INSERT INTO rds_demo(`id`, `version`, `pattern`, `function`) values('1', 2, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":5,"to":5,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0 || action == 2","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction');
Add a rule whose id value is 2 as Rule 2.
The SQL statement of Rule 2 is the same as that of version 1 of Rule 1. The StartCondition parameter is still set to action == 0, and the two values of the times parameter are 3 and 3.
INSERT INTO rds_demo(`id`, `version`, `pattern`, `function`) values('2', 1, '{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":null,"nodes":[{"name":"end","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["SINGLE"],"times":null,"untilCondition":null},"condition":{"className":"com.alibaba.ververica.cep.demo.condition.EndCondition","type":"CLASS"},"type":"ATOMIC"},{"name":"start","quantifier":{"consumingStrategy":"SKIP_TILL_NEXT","properties":["LOOPING"],"times":{"from":3,"to":3,"windowTime":null},"untilCondition":null},"condition":{"expression":"action == 0","type":"AVIATOR"},"type":"ATOMIC"}],"edges":[{"source":"start","target":"end","type":"SKIP_TILL_NEXT"}],"window":null,"afterMatchStrategy":{"type":"SKIP_PAST_LAST_EVENT","patternName":null},"type":"COMPOSITE","version":1}','com.alibaba.ververica.cep.demo.dynamic.DemoPatternProcessFunction');
Send eight simple messages to trigger a match in the Apsara MQ for Kafka console.
The following eight messages provide an example:
1,Ken,0,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,2,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,0,1,1662022777000 1,Ken,2,1,1662022777000
On the Log List tab of the Running Task Managers tab, find the log file whose name ends with .out, and view the matching results.
If you want to search for the matching results that are generated based on Version 2 of Rule 1, use the
A match for Pattern of (id, version): (1, 2)
keyword.If you want to search for the matching results that are generated based on Version 1 of Rule 2, use the
A match for Pattern of (id, version): (2, 1)
keyword.
The preceding figures show the following matching results: Based on Version 2 of Rule 1, the Flink CEP deployment matches a sequence of events that contain five consecutive events with action values of 0 or 2 and an event with the action value other than 1. This indicates that Rule 1 that is dynamically updated takes effect. Based on Version 1 of Rule 2, the Flink CEP deployment matches two sequences of events that contain three consecutive events with action values of 0 and an event with the action value other than 1. This indicates that Rule 2 that is dynamically added takes effect.