Optimize Hive jobs on E-MapReduce (EMR) by tuning code patterns and adjusting memory, vCPU, and task-count parameters.
Job optimization solutions
Category | Optimization |
Code optimization | Data cleansing, multiple DISTINCT replacement, data skew handling |
Parameter tuning | Memory, vCPU, task count, parallel execution, fetch task, vectorized query execution, small file merging |
Optimize code
Cleanse data
Reduce the volume of data that Hive processes before expensive operations run.
Filter by partition when reading from a partitioned table. This avoids full table scans.
Filter data before a JOIN operation, not after.
Store repeatedly used temporary results in an intermediate table to avoid redundant computation.
Replace multiple DISTINCT operators
Multiple DISTINCT operators in a single query can cause data expansion. Replace them with two GROUP BY clauses: an inner GROUP BY to deduplicate and an outer GROUP BY to aggregate.
Before optimization
SELECT k,
COUNT(DISTINCT CASE WHEN a > 1 THEN user_id END) user1,
COUNT(DISTINCT CASE WHEN a > 2 THEN user_id END) user2,
COUNT(DISTINCT CASE WHEN a > 3 THEN user_id END) user3,
COUNT(DISTINCT CASE WHEN a > 4 THEN user_id END) user4
FROM t
GROUP BY k;After optimization
The inner query groups by (k, user_id) to deduplicate, reducing the data volume. The outer query groups by k to produce the final counts.
SELECT k,
SUM(CASE WHEN user1 > 0 THEN 1 ELSE 0 END) AS user1,
SUM(CASE WHEN user2 > 0 THEN 1 ELSE 0 END) AS user2,
SUM(CASE WHEN user3 > 0 THEN 1 ELSE 0 END) AS user3,
SUM(CASE WHEN user4 > 0 THEN 1 ELSE 0 END) AS user4
FROM
(SELECT k,
user_id,
COUNT(CASE WHEN a > 1 THEN user_id END) user1,
COUNT(CASE WHEN a > 2 THEN user_id END) user2,
COUNT(CASE WHEN a > 3 THEN user_id END) user3,
COUNT(CASE WHEN a > 4 THEN user_id END) user4
FROM t
GROUP BY k, user_id
) tmp
GROUP BY k;Handle data skew in GROUP BY operations
Use one of the following methods to handle skewed keys.
Skewed keys in GROUP BY
Enable aggregation in the map stage to reduce data transferred to the reduce stage:
set hive.map.aggr=true; set hive.groupby.mapaggr.checkinterval=100000; -- Number of entries aggregated in the map stageDistribute keys randomly and aggregate in multiple passes: Setting
hive.groupby.skewindatatotruegenerates two MapReduce jobs:First job: Map output is randomly distributed to reduce tasks for initial aggregation. Data entries with the same GROUP BY key may go to different reduce tasks, achieving load balancing.
Second job: The intermediate results are distributed to reduce tasks by key, so data entries with the same GROUP BY key go to the same reduce task for final aggregation.
set hive.groupby.skewindata=true;
Skewed keys when joining two large tables
Randomize null or skewed values so they distribute across different reduce tasks. For example, if the log table contains many null values in user_id and the bmw_users table does not:
SELECT * FROM log a LEFT OUTER
JOIN bmw_users b ON
CASE WHEN a.user_id IS NULL THEN CONCAT('dp_hive',RAND()) ELSE a.user_id=b.user_id END;Skewed keys when joining a small table with a large table
Use MAP JOIN to broadcast the small table to all map tasks, avoiding the skew-prone reduce stage.
Tune memory parameters
Memory settings control how much heap and process memory each map or reduce task receives. Insufficient memory causes out-of-memory errors; excessive memory wastes cluster resources.
Map stage memory
| Parameter | Description | Example |
|---|---|---|
mapreduce.map.java.opts | Required. JVM heap memory for map tasks. | -Xmx2048m |
mapreduce.map.memory.mb | Required. Total JVM process memory (heap + non-heap). Formula: heap memory + non-heap memory. Example: 2048 + 256. | 2304 |
Reduce stage memory
| Parameter | Description | Example |
|---|---|---|
mapreduce.reduce.java.opts | Required. JVM heap memory for reduce tasks. | -Xmx2048m |
mapreduce.reduce.memory.mb | Required. Total JVM process memory (heap + non-heap). Formula: heap memory + non-heap memory. Example: 2048 + 256. | 2304 |
Tune vCPU parameters
| Parameter | Description |
|---|---|
mapreduce.map.cpu.vcores | Maximum number of vCPUs per map task. |
mapreduce.reduce.cpu.vcores | Maximum number of vCPUs per reduce task. |
mapreduce.reduce.cpu.vcores does not take effect in fair queuing scenarios. This parameter is primarily used to limit vCPU usage per user or application in a large cluster.
Adjust the number of tasks
Map tasks
In Hadoop Distributed File System (HDFS), each file is stored as data blocks. The number of data blocks is one of the factors that determine how many map tasks run. In most cases, each map task reads one data block.
Too many small files: Reduce the map task count to improve resource utilization.
Few large files: Increase the map task count to reduce per-task workload.
The key parameters are mapred.map.tasks, mapred.min.split.size, and dfs.block.size.
How map task count is calculated
Default mapper count. Total data size divided by the HDFS block size:
default_mapper_num = total_size / dfs.block.sizeDefault split size. Determined by the minimum and maximum split size settings:
mapred.min.split.sizeis the minimum split size andmapred.max.split.sizeis the maximum split size for a Hive job.default_split_size = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size))Split count. Total data size divided by the default split size:
split_num = total_size / default_split_sizeFinal map task count:
map_task_num = min(split_num, max(mapred.map.tasks, default_mapper_num))
To increase the number of map tasks, reduce mapred.min.split.size (which lowers default_split_size and raises split_num) or increase mapred.map.tasks.
Hive on Tez and Hive on MapReduce use different computing mechanisms. For the same query on the same data, the two engines produce significantly different map task counts. Tez combines input splits into groups and generates one map task per group, rather than one map task per input split.
Reduce tasks
Two approaches control the reduce task count:
Set bytes per reducer. Use
hive.exec.reducers.bytes.per.reducerto let Hive calculate the count automatically:reducer_num = min(total_size / hive.exec.reducers.bytes.per.reducer, hive.exec.reducers.max)Set the count directly. Use
mapred.reduce.tasksto specify an explicit number of reduce tasks.
With the Tez engine, enable auto reducer parallelism to let Tez dynamically adjust the reduce task count based on vertex output sizes: set hive.tez.auto.reducer.parallelism = true;
Starting and initializing reduce tasks consumes time and resources. Each reduce task generates one output file. An excessive number of reduce tasks leads to many small files, which can cascade when those files become input for downstream tasks.
Run stages in parallel
Hive translates a query into one or more stages. When stages are independent of each other, running them in parallel reduces overall job execution time.
| Parameter | Default | Description |
|---|---|---|
hive.exec.parallel | false | Set to true to run independent stages in parallel. |
hive.exec.parallel.thread.number | 8 | Maximum number of threads that run in parallel. |
Convert queries to fetch tasks
A fetch task returns query results directly without launching a MapReduce job, reducing execution time.
| Parameter | Default | Description |
|---|---|---|
hive.fetch.task.conversion | none | Controls which queries convert to fetch tasks. Valid values: none, minimal, more. |
Valid values:
none: No queries convert to fetch tasks. All statements launch MapReduce.
minimal: Only SELECT, FILTER, and LIMIT statements use fetch tasks.
more: In addition to the
minimalscope, supports SELECT on specified columns, FILTER on non-partition key columns, and virtual columns (aliases).
Enable vectorized query execution
Vectorized query execution processes data in batches of rows rather than one row at a time, improving query performance.
| Parameter | Default | Description |
|---|---|---|
hive.vectorized.execution.enabled | true | Enable vectorized query execution. |
hive.vectorized.execution.reduce.enabled | true | Enable vectorized query execution for reduce tasks. |
Merge small files
A large number of small output files degrades storage performance and data processing efficiency. Merge the output files of map and reduce tasks to reduce the total file count.
| Parameter | Default | Description |
|---|---|---|
hive.merge.mapfiles | true | Merge output files of map tasks. |
hive.merge.mapredfiles | false | Merge output files of reduce tasks. |
hive.merge.size.per.task | 256000000 (bytes) | Target size of each merged file. |
Example:
SET hive.merge.mapfiles = true;
SET hive.merge.mapredfiles = true;
SET hive.merge.size.per.task = 536870912; -- 512 MB