All Products
Search
Document Center

MaxCompute:Optimize SQL statements

Last Updated:Dec 04, 2024

This topic describes common scenarios where you can optimize SQL statements to achieve better performance and provides optimization examples.

Optimize the concurrency

Concurrency is a metric that indicates how many operations can be simultaneously performed for each execution plan. For example, if a task named M1 is executed by calling 1,000 instances, the concurrency of task M1 is 1000. You can appropriately configure and adjust the task concurrency to improve the task execution efficiency.

This section describes the scenarios where the concurrency can be optimized.

Only one instance is called

In some operations, the system forcefully calls only one instance to execute tasks. The following operations are used as examples.

  • In aggregation operations, GROUP BY is not specified, or GROUP BY is set to a constant.

  • PARTITION BY in the OVER clause for a window function is set to a constant.

  • In an SQL statement, DISTRIBUTE BY or CLUSTER BY is set to a constant.

Solution: Check whether the operations based on a constant are necessary. We recommend that you cancel these operations to prevent the system from forcefully calling only one instance to execute tasks.

An excessively large or small number of instances are called

Important

The execution performance does not always become better as the concurrency increases. If you call an excessively large number of instances for a job, the execution speed may decrease due to the following reasons:

  • An excessively large number of instances lead to a longer period of time to wait for resources and a larger number of waits in queues.

  • It takes time to initialize each instance. The higher the concurrency, the longer the total time taken by the initialization, and the lower the percentage of valid execution time.

In the following scenarios, the system forcefully calls an excessively large number of instances.

  • The system must read data from a large number of small-sized partitions. For example, if you execute an SQL statement to read data from 10,000 partitions, the system forcefully calls 10,000 instances.

    Solution: Optimize your SQL statements to reduce the number of partitions from which you want to read data. For example, you can prune the partitions that do not need to be read or split a large job into multiple small jobs.

  • The system calls an instance to read 256 MB of data each time, and the time for each call is short. If you need to read a large amount of data, the system calls an excessively large number of instances. As a result, the instances need to queue up for resources for a long period of time.

    Solution: Run the following commands to decrease the maximum number of instances that can be concurrently called for a reduce task. In this case, the amount of data that is processed on each instance increases.

    SET odps.stage.mapper.split.size=<256>;
    SET odps.stage.reducer.num=<Maximum number of concurrent instances>;

Configure the number of instances

  • Tasks that involve table reading

    • Method 1: Adjust the concurrency by configuring parameters.

      -- Configure the maximum amount of input data of a mapper. Unit: MB.
      -- Default value: 256. Valid values: [1,Integer.MAX_VALUE].
      SET odps.sql.mapper.split.size=<value>;
    • Method 2: Use a split size hint provided by MaxCompute to adjust the concurrency of read operations on a single table.

      -- Set the split size to 1 MB. This setting indicates that a task is split into subtasks based on a size of 1 MB when data in the src table is read.
      SELECT a.key FROM src a /*+split_size(1)*/ JOIN src2 b ON a.key=b.key;
    • Method 3: Split table data based on the data amount or number of rows or by specifying the concurrency.

    The odps.sql.mapper.split.size parameter in Method 1 only supports the overall settings in the map stage, and the minimum data amount is 1 MB. You can adjust the concurrency at the table level based on your business requirements. This method is applicable, especially when the size of data in each row is small and subsequent computing workloads are high. This method can reduce the number of rows that are concurrently processed and improve the task concurrency.

    You can run one of the following commands to adjust the concurrency.

    • Configure the size of a single shard for concurrent processing in tables.

      SET odps.sql.split.size = {"table1": 1024, "table2": 512};
    • Configure the number of rows for concurrent processing in tables.

      SET odps.sql.split.row.count = {"table1": 100, "table2": 500};
    • Configure the concurrency for tables.

      SET odps.sql.split.dop = {"table1": 1, "table2": 5};
    Note

    The odps.sql.split.row.count and odps.sql.split.dop parameters can be used only for internal tables, non-transactional tables, and non-clustered tables.

  • Tasks that do not involve table reading

    You can adjust the concurrency by using one of the following methods:

    • Method 1: Specify the odps.stage.reducer.num parameter. Run the following command to forcefully set the concurrency of reducers. This setting affects all related tasks.

      -- Configure the number of instances that are called to execute reducer tasks.
      -- Valid values: [1,99999].
      SET odps.stage.reducer.num=<value>;
    • Method 2: Specify the odps.stage.joiner.num parameter. Run the following command to forcefully set the concurrency of joiners. This setting affects all related tasks.

      -- Configure the number of instances that are called to execute joiner tasks.
      -- Valid values: [1,99999].
      SET odps.stage.joiner.num=<value>;
    • Method 3: Specify the odps.sql.mapper.split.size parameter.

      For tasks that do not involve table reading, their concurrency is affected by the concurrency of tasks that involve table reading. You can adjust the concurrency of tasks that do not involve table reading by adjusting the concurrency of tasks that involve table reading.

Optimize window functions

If window functions are used in SQL statements, a reduce task is assigned to each window function. A large number of window functions consume a large amount of resources. You can optimize the window functions that meet both of the following conditions:

  • The OVER clause which defines how to partition and sort rows in a table must be the same.

  • Multiple window functions must be executed at the same level of nesting in an SQL statement.

The window functions that meet the preceding conditions are merged to be executed by one reduce task. The following SQL statement provides an example:

SELECT
RANK() OVER (PARTITION BY A ORDER BY B desc) AS RANK,
ROW_NUMBER() OVER (PARTITION BY A ORDER BY B desc) AS row_num
FROM MyTable;

Optimize subqueries

The following statement contains a subquery:

SELECT * FROM table_a a WHERE a.col1 IN (SELECT col1 FROM table_b b WHERE xxx);

If the subquery on the table_b table returns more than 9,999 values from the col1 column, the following error message appears: records returned from subquery exceeded limit of 9999. In this case, you can replace the preceding statement with the following statement:

SELECT a.* FROM table_a a JOIN (SELECT DISTINCT col1 FROM table_b b WHERE xxx) c ON (a.col1 = c.col1);
Note
  • If the DISTINCT keyword is not used, the subquery result table c may contain duplicate values in the col1 column. In this case, the query on table a returns more results.

  • If the DISTINCT keyword is used, only one worker is assigned to perform the subquery. If the subquery involves a large amount of data, the whole query slows down.

  • If you are sure that the values that meet the subquery conditions in the col1 column are unique, you can delete the DISTINCT keyword to improve the query performance.

Optimize joins

When you join two tables, we recommend that you use the WHERE clause based on the following rules:

  • Specify the partition limits of the primary table in the WHERE clause. We recommend that you define a subquery for the primary table to obtain the required data first.

  • Write the WHERE clause of the primary table at the end of the statement.

  • Specify the partition limits of the secondary table in the ON clause or a subquery, instead of in the WHERE clause.

The following code provides an example:

SELECT * FROM A JOIN (SELECT * FROM B WHERE dt=20150301)B ON B.id=A.id WHERE A.dt=20150301;
SELECT * FROM A JOIN B ON B.id=A.id WHERE B.dt=20150301; -- We recommend that you do not use this statement. The system performs the JOIN operation before it performs partition pruning. This increases the amount of data and causes the query performance to deteriorate. 
SELECT * FROM (SELECT * FROM A WHERE dt=20150301)A JOIN (SELECT * FROM B WHERE dt=20150301)B ON B.id=A.id;

Optimize aggregate functions

Replace the collect_list function with the wm_concat function to optimize the performance. The following code provides examples:

-- Implement the collect_list function.
SELECT concat_ws(',', sort_array(collect_list(key))) FROM src;
-- Implement the wm_concat function for better performance.
SELECT wm_concat(',', key) WITHIN GROUP (ORDER BY key) FROM src;


-- Implement the collect_list function.
SELECT array_join(collect_list(key), ',') FROM src;
-- Implement the wm_concat function for better performance.
SELECT wm_concat(',', key) FROM src;