The Fusion engine is a high-performance vectorized SQL execution engine built into E-MapReduce (EMR) Serverless Spark. The Fusion engine outperforms Apache Spark by twice in TPC-DS benchmark tests. The Fusion engine is fully compatible with Apache Spark. You do not need to modify your existing code. To enable the Fusion engine, you need to only select the Built-in Fusion Engine (formerly Spark Native Engine) version when you create a compute or an SQL task in EMR Serverless Spark.
Precautions
The Fusion engine uses the off-heap memory of Spark. When you create a compute or an SQL task, you must configure spark.memory.offHeap.enabled=true
for the Spark Configuration parameter to enable the off-heap memory of Spark. You can configure the spark.memory.offHeap.size
parameter to specify the size of the off-heap memory based on your business requirements.
Supported scenarios
The Fusion engine is suitable for Spark SQL and DataFrame tasks. The Fusion engine can be used to improve the performance of most operators, expressions, and data types. However, the Fusion engine cannot accelerate the execution of Resilient Distributed Dataset (RDD) tasks or the execution of tasks in which user-defined functions (UDFs) are used.
Storage formats
The Fusion engine supports the following data storage formats:
Parquet
Paimon
ORC (partial)
Operators
The Fusion engine provides acceleration for most common operators. The following table describes the operators.
Type | Operator |
Source |
|
Sink | DataWritingCommandExec |
Common operation |
|
Aggregation | HashAggregateExec |
Join |
|
Window | WindowExec |
Exchange |
|
Limit |
|
Subquery | SubqueryBroadcastExec |
Others |
|
Expressions
The following table describes the expressions supported by the Fusion engine.
Type | Expression |
Comparison/Logic | !, !=, <, <=, >, >=, <=>, <>, =, ==, ||, and, between, is not null, is null, negative, null if, and or |
Algorithm | %, +, -, *, /, isnan, mod, negative, not, positive, abs, acos, acosh, asin, asinh, atan, atan2, atanh, cbrt, ceil, ceiling, cos, cosh, degrees, e, exp, floor, ln, log, log10, log2, pi, pmod, pow, power, radians, rand, random, rint, round, shiftleft, shiftright, sign, signum, sin, sqrt, tan, and tanh |
Bitwise | ^, |, &, ~, bit_and, bit_count, bit_or, bit_xor, and bit_length |
Expression | case, if, and when |
Set | in and find_in_set |
String calculation | ascii, char, chr, char_length, character_length, concat, instr, lcase, lower, length, locate, lower, lpad, and ltrim, overlay, replace, reverse, rtrim, split, split_part, substr, substring, trim, ucase, upper, like, regexp, regexp_extract, regexp_extract_all, regexp_like, regexp_replace, and rlike |
Aggregation | aggregate, approx_count_distinct, avg, collect_list, collect_set, corr, count, covar_pop, covar_samp, first, first_value, kurtosis, last, last_value, max, max_by, mean, min, regr_avgx, regr_avgy, regr_count, regr_2, regr_intercept, regr_slope, regr_sxy, regr_sxx, regr_syy, skewness, std, stddev, stddev_pop, stddev_samp, sum, var_pop, var_samp, and variance |
Window | cume_dist, dense_rank, lag, lead, nth_value, ntile, percent_rank, rank, and row_number |
Time | add_months, current_date, current_timestamp, current_timezone, date, date_add, date_format, date_from_unix_date, date_sub, datediff, day, dayofmonth, dayofweek, dayofyear, from_unixtime, from_utc_timestamp, hour, last_day, make_date, minute, month, next_day, now, quarter, second, timestamp_micros, timestamp_millis, to_date, to_unix_timestamp, unix_seconds, unix_millis, unix_micros, weekday, weekofyear, and year |
JSON | get_json_object and json_array_length |
Array | array, array_contains, array_distinct, array_except, array_intersect, array_join, array_max, array_min, array_position, array_remove, array_repeat, array_sort, arrays_overlap, arrays_zip, element_at, exists, filter, forall, flatten, shuffle, size, and sort_array |
Map | map, get_map_value, map_from_arrays, map_keys, map_values, map_zip_with, named_struct, struct, and str_to_map |
Encoding | crc32, hash, md5, sha1, and sha2 |
Others | current_catalog, current_database, greatest, least, monotonically_increasing_id, nanvl, spark_partition_id, stack, uuid, and rand |
Data types
The Fusion engine supports the following data types:
Byte, Short, Int, and Long
Boolean
String and Binary
Decimal
Float and Double
Date and Timestamp
Unsupported scenarios
Operators
Type | Operator |
Aggregation |
|
Exchange | CustomShuffleReaderExec |
Pandas |
|
Others |
|
Data types
Struct
Array
Map