All Products
Search
Document Center

E-MapReduce:Hive job optimization

Last Updated:Mar 01, 2026

Optimize Hive jobs on E-MapReduce (EMR) by tuning code patterns and adjusting memory, vCPU, and task-count parameters.

Job optimization solutions

Category

Optimization

Code optimization

Data cleansing, multiple DISTINCT replacement, data skew handling

Parameter tuning

Memory, vCPU, task count, parallel execution, fetch task, vectorized query execution, small file merging

Optimize code

Cleanse data

Reduce the volume of data that Hive processes before expensive operations run.

  1. Filter by partition when reading from a partitioned table. This avoids full table scans.

  2. Filter data before a JOIN operation, not after.

  3. Store repeatedly used temporary results in an intermediate table to avoid redundant computation.

Replace multiple DISTINCT operators

Multiple DISTINCT operators in a single query can cause data expansion. Replace them with two GROUP BY clauses: an inner GROUP BY to deduplicate and an outer GROUP BY to aggregate.

Before optimization

SELECT k,
       COUNT(DISTINCT CASE WHEN a > 1 THEN user_id END) user1,
       COUNT(DISTINCT CASE WHEN a > 2 THEN user_id END) user2,
       COUNT(DISTINCT CASE WHEN a > 3 THEN user_id END) user3,
       COUNT(DISTINCT CASE WHEN a > 4 THEN user_id END) user4
FROM t
GROUP BY k;

After optimization

The inner query groups by (k, user_id) to deduplicate, reducing the data volume. The outer query groups by k to produce the final counts.

SELECT k,
       SUM(CASE WHEN user1 > 0 THEN 1 ELSE 0 END) AS user1,
       SUM(CASE WHEN user2 > 0 THEN 1 ELSE 0 END) AS user2,
       SUM(CASE WHEN user3 > 0 THEN 1 ELSE 0 END) AS user3,
       SUM(CASE WHEN user4 > 0 THEN 1 ELSE 0 END) AS user4
FROM
        (SELECT k,
                user_id,
                COUNT(CASE WHEN a > 1 THEN user_id END) user1,
                COUNT(CASE WHEN a > 2 THEN user_id END) user2,
                COUNT(CASE WHEN a > 3 THEN user_id END) user3,
                COUNT(CASE WHEN a > 4 THEN user_id END) user4
        FROM t
        GROUP BY k, user_id
        ) tmp
GROUP BY k;

Handle data skew in GROUP BY operations

Use one of the following methods to handle skewed keys.

Skewed keys in GROUP BY

  1. Enable aggregation in the map stage to reduce data transferred to the reduce stage:

       set hive.map.aggr=true;
       set hive.groupby.mapaggr.checkinterval=100000;  -- Number of entries aggregated in the map stage
  2. Distribute keys randomly and aggregate in multiple passes: Setting hive.groupby.skewindata to true generates two MapReduce jobs:

    • First job: Map output is randomly distributed to reduce tasks for initial aggregation. Data entries with the same GROUP BY key may go to different reduce tasks, achieving load balancing.

    • Second job: The intermediate results are distributed to reduce tasks by key, so data entries with the same GROUP BY key go to the same reduce task for final aggregation.

       set hive.groupby.skewindata=true;

Skewed keys when joining two large tables

Randomize null or skewed values so they distribute across different reduce tasks. For example, if the log table contains many null values in user_id and the bmw_users table does not:

SELECT * FROM log a LEFT OUTER
JOIN bmw_users b ON
CASE WHEN a.user_id IS NULL THEN CONCAT('dp_hive',RAND()) ELSE a.user_id=b.user_id END;

Skewed keys when joining a small table with a large table

Use MAP JOIN to broadcast the small table to all map tasks, avoiding the skew-prone reduce stage.

Tune memory parameters

Memory settings control how much heap and process memory each map or reduce task receives. Insufficient memory causes out-of-memory errors; excessive memory wastes cluster resources.

Map stage memory

ParameterDescriptionExample
mapreduce.map.java.optsRequired. JVM heap memory for map tasks.-Xmx2048m
mapreduce.map.memory.mbRequired. Total JVM process memory (heap + non-heap). Formula: heap memory + non-heap memory. Example: 2048 + 256.2304

Reduce stage memory

ParameterDescriptionExample
mapreduce.reduce.java.optsRequired. JVM heap memory for reduce tasks.-Xmx2048m
mapreduce.reduce.memory.mbRequired. Total JVM process memory (heap + non-heap). Formula: heap memory + non-heap memory. Example: 2048 + 256.2304

Tune vCPU parameters

ParameterDescription
mapreduce.map.cpu.vcoresMaximum number of vCPUs per map task.
mapreduce.reduce.cpu.vcoresMaximum number of vCPUs per reduce task.
Note

mapreduce.reduce.cpu.vcores does not take effect in fair queuing scenarios. This parameter is primarily used to limit vCPU usage per user or application in a large cluster.

Adjust the number of tasks

Map tasks

In Hadoop Distributed File System (HDFS), each file is stored as data blocks. The number of data blocks is one of the factors that determine how many map tasks run. In most cases, each map task reads one data block.

  • Too many small files: Reduce the map task count to improve resource utilization.

  • Few large files: Increase the map task count to reduce per-task workload.

The key parameters are mapred.map.tasks, mapred.min.split.size, and dfs.block.size.

How map task count is calculated

  1. Default mapper count. Total data size divided by the HDFS block size:

       default_mapper_num = total_size / dfs.block.size
  2. Default split size. Determined by the minimum and maximum split size settings: mapred.min.split.size is the minimum split size and mapred.max.split.size is the maximum split size for a Hive job.

       default_split_size = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size))
  3. Split count. Total data size divided by the default split size:

       split_num = total_size / default_split_size
  4. Final map task count:

       map_task_num = min(split_num, max(mapred.map.tasks, default_mapper_num))

To increase the number of map tasks, reduce mapred.min.split.size (which lowers default_split_size and raises split_num) or increase mapred.map.tasks.

Important

Hive on Tez and Hive on MapReduce use different computing mechanisms. For the same query on the same data, the two engines produce significantly different map task counts. Tez combines input splits into groups and generates one map task per group, rather than one map task per input split.

Reduce tasks

Two approaches control the reduce task count:

  • Set bytes per reducer. Use hive.exec.reducers.bytes.per.reducer to let Hive calculate the count automatically:

      reducer_num = min(total_size / hive.exec.reducers.bytes.per.reducer, hive.exec.reducers.max)
  • Set the count directly. Use mapred.reduce.tasks to specify an explicit number of reduce tasks.

Note

With the Tez engine, enable auto reducer parallelism to let Tez dynamically adjust the reduce task count based on vertex output sizes: set hive.tez.auto.reducer.parallelism = true;

Starting and initializing reduce tasks consumes time and resources. Each reduce task generates one output file. An excessive number of reduce tasks leads to many small files, which can cascade when those files become input for downstream tasks.

Run stages in parallel

Hive translates a query into one or more stages. When stages are independent of each other, running them in parallel reduces overall job execution time.

ParameterDefaultDescription
hive.exec.parallelfalseSet to true to run independent stages in parallel.
hive.exec.parallel.thread.number8Maximum number of threads that run in parallel.

Convert queries to fetch tasks

A fetch task returns query results directly without launching a MapReduce job, reducing execution time.

ParameterDefaultDescription
hive.fetch.task.conversionnoneControls which queries convert to fetch tasks. Valid values: none, minimal, more.

Valid values:

  • none: No queries convert to fetch tasks. All statements launch MapReduce.

  • minimal: Only SELECT, FILTER, and LIMIT statements use fetch tasks.

  • more: In addition to the minimal scope, supports SELECT on specified columns, FILTER on non-partition key columns, and virtual columns (aliases).

Enable vectorized query execution

Vectorized query execution processes data in batches of rows rather than one row at a time, improving query performance.

ParameterDefaultDescription
hive.vectorized.execution.enabledtrueEnable vectorized query execution.
hive.vectorized.execution.reduce.enabledtrueEnable vectorized query execution for reduce tasks.

Merge small files

A large number of small output files degrades storage performance and data processing efficiency. Merge the output files of map and reduce tasks to reduce the total file count.

ParameterDefaultDescription
hive.merge.mapfilestrueMerge output files of map tasks.
hive.merge.mapredfilesfalseMerge output files of reduce tasks.
hive.merge.size.per.task256000000 (bytes)Target size of each merged file.

Example:

SET hive.merge.mapfiles = true;
SET hive.merge.mapredfiles = true;
SET hive.merge.size.per.task = 536870912; -- 512 MB