This topic describes the rules that are defined in the JSON format in dynamic Flink complex event progressing (CEP).
Intended audience
- Risk control platform developers: The platform developers who are familiar with dynamic Flink CEP can learn the format that is described in this topic and determine whether further encapsulation is required based on the platform requirements.
- Risk control strategy personnel: The personnel who are familiar with only specific risk control strategies but do not have Java development experience can learn and use this format to write new rules based on CEP concepts and apply the rules in online risk control deployments.
Definitions of CEP rules in the JSON format
- NodeA node indicates a complete pattern. The following table describes the fields in a pattern.
Field Description Data type Required Remarks name The name of the pattern. STRING Yes A unique string. Note The name of each node must be unique.type The type of the node. ENUM(STRING) Yes - For a node that includes a child pattern, the value of this field is COMPOSITE.
- For a node that does not include a child pattern, the value of this field is ATOMIC.
quantifier A quantifier that describes how to match the pattern. For example, you can specify the quantifier to match a pattern only once. Dictionary Yes For more information, see "Quantifier" in this topic. condition The condition. Dictionary No For more information, see "Condition" in this topic. - QuantifierA quantifier is used to describe how to match events with the pattern. For example, the quantifier property of the pattern
"A*"
is LOOPING and the consumingStrategy field of the pattern is set to SKIP_TILL_ANY.Field Description Data type Required Remarks consumingStrategy The event selection strategy. ENUM(STRING) Yes Valid values: - STRICT
- SKIP_TILL_NEXT
- SKIP_TILL_ANY
For more information about the values and their meanings, see "Contiguity" in this topic.
times The number of times the pattern needs to be matched. Dictionary No Example:"times": { "from": 3, "to": 3, "windowTime": { "unit": "MINUTES", "size": 12 } },
The values of from and to must be of the INTEGER data type. The unit of windowTime can be DAYS, HOURS, MINUTES, SECONDS, or MILLISECONDS.Note windowTime can be set to null. This property is displayed in the following format:"windowTime": null
.properties The properties of the quantifier. Array of enumeration strings Yes For more information about the values and their meanings, see "Quantifier" in this topic. untilCondition The condition that is used to stop matching the pattern. Note This parameter can be configured only after the pattern that uses the LOOPING quantifier.Dictionary No For more information about the values and their meanings, see "Condition" in this topic. - ConditionConditions are used to identify events that meet specific requirements. For example, if you want to identify users whose browsing duration is longer than 5 minutes, you can specify a condition that the browsing duration is longer than 5 minutes.
Field Description Data type Required Remarks type The type of the condition. ENUM(STRING) Yes Valid values: - CLASS: a custom condition.
- AVIATOR: a condition based on an aviator expression.
- GROOVY: a condition based on a Groovy expression.
... Other custom fields that can be serialized. No ... The following conditions are supported:- Condition of the Class type
Field Description Data type Required Remarks type The type of the condition. ENUM(STRING) Yes Set the value to Class. className The name of the class. STRING Yes The full name of the class, such as com.alibaba.ververica.cep.demo.StartCondition
. - Condition that includes a custom parameter
When you use a condition of the Class type, you can specify only the className parameter. Parameters cannot be dynamically passed. To improve the expression capabilities of conditions, dynamic Flink CEP supports the CustomArgsCondition condition, which includes a custom parameter. This way, you can configure required parameters in the CustomArgsCondition condition in the form of an array of JSON strings and dynamically construct CustomArgsCondition instances. In this case, you can dynamically update the parameters of the condition without the need to modify the Java code.
Field Description Data type Required Remarks type The type of the condition. ENUM(STRING) Yes Set the value to Class. className The name of the class. STRING Yes The full name of the class, such as com.alibaba.ververica.cep.demo.CustomMiddleCondition
.args The custom parameter. Array of strings Yes The value is an array of strings. - Condition based on an aviator expressionAviator is an expression evaluation engine that dynamically compiles expressions into bytecode. For more information, see aviatorscript. Therefore, you can use a condition based on an aviator expression in a deployment to dynamically modify the threshold of the condition. This way, you do not need to modify the Java code to recompile and run the code.
Field Description Data type Required Remarks type The name of the class. STRING Yes Set the value to AVIATOR. expression An expression string. STRING Yes This field specifies an expression string in a form similar to price > 10. The price variable is a field defined in Java code. You can change the value of the string in the database. For example, if you change the value of the string to price > 20, the price > 20 expression is dynamically loaded in a dynamic Flink CEP deployment to construct a new aviator condition to process subsequent events.
- Condition based on a Groovy expressionGroovy is a dynamic language based on the Java virtual machine (JVM) platform. For more information about the Groovy syntax, see Syntax. Dynamic Flink CEP allows you to use Groovy expressions to define conditions. This way, the thresholds of conditions can be dynamically modified.
Field Description Data type Required Remarks type The name of the class. STRING Yes Set the value to GROOVY. expression An expression string. STRING Yes This field specifies an expression string in a form similar to price > 5.0 && name.contains("mid"). The variables, such as price and name, are the fields defined in Java code. You can change the value of the string in the database. For example, if you change the value of the string to price > 20 && name.contains("end"), the new Groovy string is dynamically loaded in a dynamic Flink CEP deployment to construct a new Groovy condition to process subsequent events.
- Edge
Field Description Data type Required Remarks source The name of the source pattern. STRING Yes N/A. target The name of the destination pattern. STRING Yes N/A. type The event selection strategy. Dictionary Yes Valid values: - STRICT
- SKIP_TILL_NEXT
- SKIP_TILL_ANY
- NOT_FOLLOW
- NOT_NEXT
For more information about the values and their meanings, see "Contiguity" in this topic.
- GraphNode extends Node
A GraphNode indicates a complete pattern sequence. Each node of a GraphNode indicates an independent pattern. Each edge of a GraphNode indicates how to transform a pattern to another pattern to match events.
A GraphNode is considered a subclass of Node and can be used as a Node in a larger GraphNode. This way, GroupPattern that indicates pattern nesting is supported. Compared with Node, GraphNode provides the following additional types of fields:- The nodes and edges fields that describe the structure of a graph.
- The window field that describes the time window policy in a graph and the afterMatchSkipStrategy field that describes the skipping strategy used after event matching.
The following table describes the fields of GraphNode.Field Description Data type Required Remarks name The name of the composite pattern. STRING Yes A unique string. Note The name of each graph must be unique.type The type of the node. ENUM(STRING) Yes Set the value to COMPOSITE. version The version of the JSON format that is used by the graph. INTEGER Yes Default value: 1. nodes The child patterns that are nested in the pattern. Array of nodes Yes The value of this field is an array. The array cannot be empty. edges The connection relationships between the nested child patterns. Array of edges Yes The value of this field is an array. The array can be empty. window - If the type of the window is FIRST_AND_LAST, this field indicates the maximum time interval between the start and end of a complete match of the composite pattern.
- If the type of the window is PREVIOUS_AND_CURRENT, this field indicates the maximum time interval between the matches of two adjacent child patterns.
Dictionary No Example: "window": { "type": "FIRST_AND_LAST", "time": { "unit": "DAYS", "size": 1 } }
The unit can be DAYS, HOURS, MINUTES, SECONDS, or MILLISECONDS. The data type can be LONG or INTEGER.
afterMatchSkipStrategy The skipping strategy after all events in the graph are matched. Dictionary Yes For more information, see "AfterMatchSkipStrategy" in this topic. quantifier A quantifier that describes how to match the pattern. For example, you can specify the quantifier to match a pattern only once. Dictionary Yes For more information, see "Quantifier" in this topic. - AfterMatchSkipStrategy
Field Description Data type Required Remarks type The type of the strategy. ENUM(STRING) Yes Valid values:- NO_SKIP: returns each successful match in the output. This is the default value.
- SKIP_TO_NEXT: discards all partial matches that start with the same event.
- SKIP_PAST_LAST_EVENT: discards all partial matches that start between the beginning and end of the match.
- SKIP_TO_FIRST: discards all partial matches that start between the beginning of the match and the first occurrence of the event named PatternName.
- SKIP_TO_LAST: discards all partial matches that start between the beginning of the match and the last occurrence of the event named PatternName.
For more information, see After Match Skip Strategy.
patternName The name of the pattern that uses the strategy. STRING No A unique string. - Contiguity
Valid value Description STRICT Strict contiguity. Unmatched events cannot appear between matched events. SKIP_TILL_NEXT Relaxed contiguity. Unmatched events can appear between matched events. The unmatched events are ignored. SKIP_TILL_ANY Non-deterministic relaxed contiguity. This value indicates a further relaxed contiguity. In this contiguity mode, additional matches for specific matched events can be ignored. NOT_NEXT The subsequent event that occurs after an event cannot be a specified event. NOT_FOLLOW A specified event cannot appear subsequently. For more information, see FlinkCEP - Complex event processing for Flink.
- Properties of a quantifier
Valid value Description SINGLE The pattern occurs only once. LOOPING The pattern is a looping pattern and may occur multiple times. This quantifier is similar to the asterisk (*) and plus sign (+) in regular expressions. TIMES The pattern can occur for a specified number of times. GREEDY The greedy matching strategy is used to match the pattern to obtain the maximum number of matches. OPTIONAL The pattern is optional.
Example 1: Use a common pattern
- Obtained coupons for a venue.
- Added items to their shopping carts for more than three times.
- Did not complete the payments.
Pattern<Event, Event> pattern =
Pattern.<Event>begin("start")
.where(new StartCondition())
.optional()
.followedBy("middle")
.where(new MiddleCondition())
.timesOrMore(3)
.notFollowedBy("end")
.where(new EndCondition())
.within(Time.minutes(10));
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": null,
"nodes": [
{
"name": "end",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.EndCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "middle",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"LOOPING"
],
"times": {
"from": 3,
"to": 3,
"windowTime": null
},
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.MiddleCondition",
"type": "CLASS"
},
"type": "ATOMIC"
},
{
"name": "start",
"quantifier": {
"consumingStrategy": "SKIP_TILL_NEXT",
"properties": [
"SINGLE",
"OPTIONAL"
],
"times": null,
"untilCondition": null
},
"condition": {
"className": "com.alibaba.ververica.cep.demo.condition.StartCondition",
"type": "CLASS"
},
"type": "ATOMIC"
}
],
"edges": [
{
"source": "middle",
"target": "end",
"type": "NOT_FOLLOW"
},
{
"source": "start",
"target": "middle",
"type": "SKIP_TILL_NEXT"
}
],
"window": {
"type": "FIRST_AND_LAST",
"time": {
"unit": "MINUTES",
"size": 10
}
},
"afterMatchStrategy": {
"type": "NO_SKIP",
"patternName": null
},
"type": "COMPOSITE",
"version": 1
}
Example 2: Use the condition that includes a custom parameter in a pattern
This example describes how to specify different marketing strategies for customers of different classes during a real-time e-commerce promotional event. For example, you can specify a marketing strategy that sends marketing-related text messages to customers of Class A, a marketing strategy that sends coupons to customers of Class B, and a marketing strategy that does not take marketing actions for other customers. You can define a condition of the common class in your deployment to meet the preceding requirements. If you want to adjust marketing strategies when a condition of the common class is used in your deployment, you must rewrite the deployment code and recompile and run the deployment. For example, you can modify a marketing strategy that sends coupons to customers of Class C. To simplify the operation, you can use the condition that includes a custom parameter. After you define how to adjust the strategies based on the passed parameter in the code, you need to only change the value of the passed parameter in the database. The value of the passed parameter is the value of the args parameter of the condition that includes a custom parameter. For example, you can change ["A", "B"] to ["A", "B", "C"] to perform dynamic updates of marketing strategies.
"condition": {
"args": [
"A", "B"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}
"condition": {
"args": [
"A", "B", "C"
],
"className": "org.apache.flink.cep.pattern.conditions.CustomMiddleCondition",
"type": "CLASS"
}
For more information about how to use the condition that includes a custom parameter in specific business scenarios, see Demo.