All Products
Search
Document Center

MaxCompute:Optimize SQL statements

Last Updated:Jul 22, 2024

This topic describes common scenarios where you can optimize SQL statements to achieve better performance.

Optimize the parallelism

Parallelism is a metric that indicates how many operations can be simultaneously performed for each execution plan. For example, if a task named M1 is executed by calling 1,000 instances, the parallelism of task M1 is 1,000. The execution performance does not always become better as the parallelism increases. If you call an excessive number of instances for a task, the execution speed may decrease due to the following reasons:

  • An excessive number of instances lead to a longer period of time to wait for resources and a larger number of waits in queues.

  • It takes time to initialize each instance. The higher the parallelism, the more the total time taken by the initialization, and the lower the percentage of valid execution time.

You can optimize the parallelism in the following scenarios:

  • Only one instance is called

    In some operations, the system forcefully calls only one instance to run jobs. Examples:

    • In aggregation operations, GROUP BY is not specified, or GROUP BY is set to a constant.

    • PARTITION BY in the OVER clause for a window function is set to a constant.

    • In an SQL statement, DISTRIBUTE BY or CLUSTER BY is set to a constant.

    Solution: Check whether the operations based on a constant are necessary. We recommend that you cancel these operations to prevent the system from forcefully calling only one instance.

  • Excessive number of instances are called

    In the following scenarios, the system forcefully calls an excessive number of instances:

    • The system must read data from a large number of small-sized partitions. For example, if you execute an SQL statement to read data from 10,000 partitions, the system forcefully calls 10,000 instances.

      Solution: Optimize your SQL statements to reduce the number of partitions from which you want to read data. For example, you can prune the partitions that do not need to be read or split a large job into multiple small jobs.

    • The data compression ratio is excessively high. For example, a highly compressed file with the size of 256 MB is decompressed to hundreds of GB of data. In this case, the system needs to call a large number of instances.

      Solution: Run the following commands to increase the maximum number of instances that can be concurrently called for a reduce task. In this case, the amount of data that is processed on each instance decreases.

      set odps.stage.mapper.split.size=<256>;
      set odps.stage.reducer.num=<Maximum number of concurrent instances>;
    • The system calls an instance to read 256 MB of data each time, and the time for each call is short. If you need to read a large amount of data, the system calls an excessive number of instances. As a result, the instances need to queue up for resources for a long period of time.

      Solution: Run the following commands to decrease the maximum number of instances that can be concurrently called for a reduce task. In this case, the amount of data that is processed on each instance increases.

      set odps.stage.mapper.split.size=<256>;
      set odps.stage.reducer.num=<Maximum number of concurrent instances>;
  • Configure the number of instances

    • Tasks that involve table reading

      • Method 1: Adjust the concurrency by configuring parameters.

        -- Configure the maximum amount of input data of a mapper. Unit: MB.
        -- Default value: 256. Valid values: [1,Integer.MAX_VALUE].
        set odps.sql.mapper.split.size=<value>;
      • Method 2: Use a split size hint provided by MaxCompute to adjust the concurrency of read operations on a single table.

        -- Set the split size to 1 MB. This setting indicates that a job is split into subtasks based on a size of 1 MB when data in the src table is read.
        select a.key from src a /*+split_size(1)*/ join src2 b on a.key=b.key;
    • Tasks that do not involve table reading

      You can adjust the concurrency by using one of the following methods:

      • Specify the odps.sql.mapper.split.size parameter.

        For tasks that do not involve table reading, their concurrency is affected by the concurrency of tasks that involve table reading. You can adjust the concurrency of tasks that do not involve table reading by adjusting the concurrency of tasks that involve table reading.

      • Specify the odps.stage.reducer.num parameter.

        Run the following command to forcefully configure the concurrency of reducers. The configuration affects all the related tasks.

        -- Configure the number of instances that are called to run reducer tasks.
        -- Valid values: [1,99999].
        set odps.stage.reducer.num=<value>;
      • Specify the odps.stage.joiner.num parameter.

        Run the following command to forcefully configure the concurrency of joiners. The configuration affects all the related tasks.

        -- Configure the number of instances that are called to run joiner tasks.
        -- Valid values: [1,99999].
        set odps.stage.joiner.num=<value>;

Optimize window functions

If window functions are used in SQL statements, a reduce task is assigned to each window function. A large number of window functions consume a large amount of resources. You can optimize the window functions that meet both of the following conditions:

  • The OVER clause which defines how to partition and sort rows in a table must be the same.

  • Multiple window functions must be executed at the same level of nesting in an SQL statement.

The window functions that meet the preceding conditions are merged to be executed by one reduce task. The following SQL statement provides an example:

SELECT
RANK() OVER (PARTITION BY A ORDER BY B desc) AS RANK,
ROW_NUMBER() OVER (PARTITION BY A ORDER BY B desc) AS row_num
FROM MyTable;

Optimize subqueries

The following statement contains a subquery:

SELECT * FROM table_a a WHERE a.col1 IN (SELECT col1 FROM table_b b WHERE xxx);

If the subquery on the table_b table returns more than 9,999 values from the col1 column, the following error message appears: records returned from subquery exceeded limit of 9999. In this case, you can replace the preceding statement with the following statement:

SELECT a.* FROM table_a a JOIN (SELECT DISTINCT col1 FROM table_b b WHERE xxx) c ON (a.col1 = c.col1);
Note
  • If the DISTINCT keyword is not used, the subquery result table c may contain duplicate values in the col1 column. In this case, the query on table a returns more results.

  • If the DISTINCT keyword is used, only one worker is assigned to perform the subquery. If the subquery involves a large amount of data, the whole query slows down.

  • If you are sure that the values that meet the subquery conditions in the col1 column are unique, you can delete the DISTINCT keyword to improve the query performance.

Optimize joins

When you join two tables, we recommend that you use the WHERE clause based on the following rules:

  • Specify the partition limits of the primary table in the WHERE clause. We recommend that you define a subquery for the primary table to obtain the required data first.

  • Write the WHERE clause of the primary table at the end of the statement.

  • Specify the partition limits of the secondary table in the ON clause or a subquery, instead of in the WHERE clause.

The following code provides an example:

SELECT * FROM A JOIN (SELECT * FROM B WHERE dt=20150301)B ON B.id=A.id WHERE A.dt=20150301;
SELECT * FROM A JOIN B ON B.id=A.id WHERE B.dt=20150301;-- Not recommended. The system performs the JOIN operation before it performs partition pruning. This increases the number of data records and causes the query performance to deteriorate. 
SELECT * FROM (SELECT * FROM A WHERE dt=20150301)A JOIN (SELECT * FROM B WHERE dt=20150301)B ON B.id=A.id;

Optimize aggregate functions

Replace the collect_list function with the wm_concat function to optimize the performance. The following code provides examples:

-- Implement the collect_list function.
select concat_ws(',', sort_array(collect_list(key))) from src;
-- Implement the wm_concat function for better performance.
select wm_concat(',', key) WITHIN group (order by key) from src;


-- Implement the collect_list function.
select array_join(collect_list(key), ',') from src;
-- Implement the wm_concat function for better performance.
select wm_concat(',', key) from src;