Analysis of Spark Codegen
Background introduction
There are two technical pillars behind the superior performance of SparkSQL: Optimizer and Runtime. The former is committed to finding the optimal execution plan, while the latter is committed to implementing the established execution plan as quickly as possible. The various optimizations of Runtime can be summarized into two levels:
1. Global optimization. Optimize from the perspectives of improving global resource utilization, eliminating data skew, and reducing IO, including Adaptive Execution, Shuffle Removal, etc.
2. Local optimization. Optimizing the execution efficiency of specific tasks mainly relies on Codegen technology, including Codegen at the Expression level and WholeStage level.
This article introduces the technical principle of Spark Codegen.
Case study
This section introduces Codegen's approach through two specific cases.
Expression level
Consider the calculation of the following expression: x + (1 + 2), expressed in scala code as follows:
Add(Attribute(x), Add(Literal(1), Literal(2)))
The syntax tree is as follows:
expression
The general code for recursively evaluating this syntax tree is as follows:
tree. transformUp {
case Attribute(idx) => Literal(row. getValue(idx))
case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
case Literal(c) => Literal(c)
}
Executing the above code requires a lot of additional logic such as type matching, virtual function calls, and object creation. These overheads far exceed the expression evaluation itself.
In order to eliminate these overheads, Spark Codegen directly composes the java code of the evaluation expression and compiles it on the fly. Specifically divided into three steps:
1. Code generation. Generate java code according to the syntax tree, encapsulated in the wrapper class:
... // class wrapper
row.getValue(idx) + (1 + 2)
... // class wrapper
2. Just-in-time compilation. Use the Janino framework to compile the generated code into a class file.
3. Load execution. Finally loaded and executed.
There is an order of magnitude improvement in performance before and after optimization.
expression
WholeStage level
Consider the following sql statement:
select count(*) from store_sales
where ss_item_sk=1000;
The generated physical execution plan is as follows:
expression
The usual way to implement this plan is to use the volcano model (vocano model). Each Operator inherits the Iterator interface, and its next() method first drives the upstream execution to get the input, and then executes its own logic. The code example is as follows:
class Agg extends Iterator[Row] {
def doAgg() {
while (child. hasNext()) {
val row = child. next();
// do aggregation
...
}
}
def next(): Row {
if (!doneAgg) {
doAgg();
}
return aggIter. next();
}
}
class Filter extends Iterator[Row] {
def next(): Row {
var current = child. next()
while (current != null && !predicate(current)) {
current = child. next()
}
return current;
}
}
As can be seen from the above code, the volcano model will have a large number of type conversions and virtual function calls. Virtual function calls can cause CPU branch prediction to fail, causing severe performance regressions.
In order to eliminate these overheads, Spark WholestageCodegen will generate type-determined java code for the physical plan, and then compile and load it in real time similar to Expression. The java code example generated in this example is as follows (not real code, see the following for real code snippets):
var count = 0
for (ss_item_sk in store_sales) {
if (ss_item_sk == 1000) {
count += 1
}
}
The performance improvement data before and after optimization are as follows:
expression
Spark Codegen Framework
The Spark Codegen framework has three core components
1. Core interface/class
2. CodegenContext
3. Produce-Consume Pattern
Let's introduce it in detail.
interface/class
Four core interfaces:
1. CodegenSupport (interface)
Operators that implement this interface can compose their own logic into java code. Important method:
produce() // output the java code of Row produced by this node
consume() // Output the java code that this node consumes the Row input by the upstream node
Implementation classes include but are not limited to: ProjectExec, FilterExec, HashAggregateExec, SortMergeJoinExec.
2. WholeStageCodegenExec (class)
One of the implementation classes of CodegenSupport, the integration of all adjacent Operators that implement the CodegenSupport interface in the Stage, and the output code encapsulates the execution logic of all fused Operators into a Wrapper class, which is used as the input of Janino's instant compile ginseng.
3. InputAdapter (class)
One of the implementation classes of CodegenSupport, the glue class, is used to connect the WholeStageCodegenExec node and the upstream node that does not implement CodegenSupport.
4. BufferedRowIterator (interface)
The parent class of the java code generated by WholeStageCodegenExec, important methods:
public InternalRow next() // Return the next Row
public void append(InternalRow row) // append a Row
CodegenContext
Core classes for managing generated code. It mainly covers the following functions:
1. Naming management. Ensure that there is no variable name conflict within the same Scope.
2. Variable management. Maintain class variables, determine variable types (should they be declared as independent variables or compressed into typed arrays), maintain variable initialization logic, etc.
3. Method management. Maintain class methods.
4. Internal class management. Maintain inner classes.
5. Same expression management. Maintain the same subexpression to avoid double calculation.
6. Size management. Avoid too large size of method and class, avoid too many class variables, and compare and split. For example, split the expression block into multiple functions; split the function and variable definitions into multiple inner classes.
7. Dependency management. Maintain the external objects that this class depends on, such as Broadcast objects, tool objects, and measurement objects.
8. General template management. Provide common code templates, such as genComp, nullSafeExec, etc.
Produce-Consume Pattern
Adjacent Operators generate code through the Produce-Consume pattern.
Produce generates the framework code for overall processing, for example, the code framework generated by aggregation is as follows:
if (!initialized) {
# create a hash map, then build the aggregation hash map
# call child. produce()
initialized = true;
}
while (hashmap. hasNext()) {
row = hashmap. next();
# build the aggregation results
# create variables for results
# call consume(), which will call parent.doConsume()
if (shouldStop()) return;
}
Consume generates logic for the current node to process the upstream input Row. For example, the code generated by Filter is as follows:
# code to evaluate the predicate expression, result is isNull1 and value2
if (!isNull1 && value2) {
# call consume(), which will call parent.doConsume()
}
There are two technical pillars behind the superior performance of SparkSQL: Optimizer and Runtime. The former is committed to finding the optimal execution plan, while the latter is committed to implementing the established execution plan as quickly as possible. The various optimizations of Runtime can be summarized into two levels:
1. Global optimization. Optimize from the perspectives of improving global resource utilization, eliminating data skew, and reducing IO, including Adaptive Execution, Shuffle Removal, etc.
2. Local optimization. Optimizing the execution efficiency of specific tasks mainly relies on Codegen technology, including Codegen at the Expression level and WholeStage level.
This article introduces the technical principle of Spark Codegen.
Case study
This section introduces Codegen's approach through two specific cases.
Expression level
Consider the calculation of the following expression: x + (1 + 2), expressed in scala code as follows:
Add(Attribute(x), Add(Literal(1), Literal(2)))
The syntax tree is as follows:
expression
The general code for recursively evaluating this syntax tree is as follows:
tree. transformUp {
case Attribute(idx) => Literal(row. getValue(idx))
case Add(Literal(c1), Literal(c2)) => Literal(c1+c2)
case Literal(c) => Literal(c)
}
Executing the above code requires a lot of additional logic such as type matching, virtual function calls, and object creation. These overheads far exceed the expression evaluation itself.
In order to eliminate these overheads, Spark Codegen directly composes the java code of the evaluation expression and compiles it on the fly. Specifically divided into three steps:
1. Code generation. Generate java code according to the syntax tree, encapsulated in the wrapper class:
... // class wrapper
row.getValue(idx) + (1 + 2)
... // class wrapper
2. Just-in-time compilation. Use the Janino framework to compile the generated code into a class file.
3. Load execution. Finally loaded and executed.
There is an order of magnitude improvement in performance before and after optimization.
expression
WholeStage level
Consider the following sql statement:
select count(*) from store_sales
where ss_item_sk=1000;
The generated physical execution plan is as follows:
expression
The usual way to implement this plan is to use the volcano model (vocano model). Each Operator inherits the Iterator interface, and its next() method first drives the upstream execution to get the input, and then executes its own logic. The code example is as follows:
class Agg extends Iterator[Row] {
def doAgg() {
while (child. hasNext()) {
val row = child. next();
// do aggregation
...
}
}
def next(): Row {
if (!doneAgg) {
doAgg();
}
return aggIter. next();
}
}
class Filter extends Iterator[Row] {
def next(): Row {
var current = child. next()
while (current != null && !predicate(current)) {
current = child. next()
}
return current;
}
}
As can be seen from the above code, the volcano model will have a large number of type conversions and virtual function calls. Virtual function calls can cause CPU branch prediction to fail, causing severe performance regressions.
In order to eliminate these overheads, Spark WholestageCodegen will generate type-determined java code for the physical plan, and then compile and load it in real time similar to Expression. The java code example generated in this example is as follows (not real code, see the following for real code snippets):
var count = 0
for (ss_item_sk in store_sales) {
if (ss_item_sk == 1000) {
count += 1
}
}
The performance improvement data before and after optimization are as follows:
expression
Spark Codegen Framework
The Spark Codegen framework has three core components
1. Core interface/class
2. CodegenContext
3. Produce-Consume Pattern
Let's introduce it in detail.
interface/class
Four core interfaces:
1. CodegenSupport (interface)
Operators that implement this interface can compose their own logic into java code. Important method:
produce() // output the java code of Row produced by this node
consume() // Output the java code that this node consumes the Row input by the upstream node
Implementation classes include but are not limited to: ProjectExec, FilterExec, HashAggregateExec, SortMergeJoinExec.
2. WholeStageCodegenExec (class)
One of the implementation classes of CodegenSupport, the integration of all adjacent Operators that implement the CodegenSupport interface in the Stage, and the output code encapsulates the execution logic of all fused Operators into a Wrapper class, which is used as the input of Janino's instant compile ginseng.
3. InputAdapter (class)
One of the implementation classes of CodegenSupport, the glue class, is used to connect the WholeStageCodegenExec node and the upstream node that does not implement CodegenSupport.
4. BufferedRowIterator (interface)
The parent class of the java code generated by WholeStageCodegenExec, important methods:
public InternalRow next() // Return the next Row
public void append(InternalRow row) // append a Row
CodegenContext
Core classes for managing generated code. It mainly covers the following functions:
1. Naming management. Ensure that there is no variable name conflict within the same Scope.
2. Variable management. Maintain class variables, determine variable types (should they be declared as independent variables or compressed into typed arrays), maintain variable initialization logic, etc.
3. Method management. Maintain class methods.
4. Internal class management. Maintain inner classes.
5. Same expression management. Maintain the same subexpression to avoid double calculation.
6. Size management. Avoid too large size of method and class, avoid too many class variables, and compare and split. For example, split the expression block into multiple functions; split the function and variable definitions into multiple inner classes.
7. Dependency management. Maintain the external objects that this class depends on, such as Broadcast objects, tool objects, and measurement objects.
8. General template management. Provide common code templates, such as genComp, nullSafeExec, etc.
Produce-Consume Pattern
Adjacent Operators generate code through the Produce-Consume pattern.
Produce generates the framework code for overall processing, for example, the code framework generated by aggregation is as follows:
if (!initialized) {
# create a hash map, then build the aggregation hash map
# call child. produce()
initialized = true;
}
while (hashmap. hasNext()) {
row = hashmap. next();
# build the aggregation results
# create variables for results
# call consume(), which will call parent.doConsume()
if (shouldStop()) return;
}
Consume generates logic for the current node to process the upstream input Row. For example, the code generated by Filter is as follows:
# code to evaluate the predicate expression, result is isNull1 and value2
if (!isNull1 && value2) {
# call consume(), which will call parent.doConsume()
}
Related Articles
-
A detailed explanation of Hadoop core architecture HDFS
Knowledge Base Team
-
What Does IOT Mean
Knowledge Base Team
-
6 Optional Technologies for Data Storage
Knowledge Base Team
-
What Is Blockchain Technology
Knowledge Base Team
Explore More Special Offers
-
Short Message Service(SMS) & Mail Service
50,000 email package starts as low as USD 1.99, 120 short messages start at only USD 1.00