All Products
Search
Document Center

MaxCompute:Data skew tuning

Last Updated:Jul 19, 2024

This topic describes the common data skew issues that occur when you use MaxCompute and provides related solutions.

MapReduce

To understand data skew, you must be familiar with MapReduce. MapReduce is a typical distributed computing framework. MapReduce uses the split-apply-combine method, which divides big issues into pieces or divides difficult issues into easy-to-handle sub-issues, separately produces results for each sub-issue, and then merges the results into a final result. Compared with the traditional parallel programming framework, MapReduce has the advantages of high fault tolerance, ease of use, and high scalability. When you use MapReduce to perform parallel programming, you need to only consider the issues that are related to programming in distributed clusters. You do not need to consider other issues such as data storage, information exchange between nodes, and transmission mechanisms. This way, distributed programming operations are simplified.

The following figure shows the workflow of MapReduce.MapReduce

Data skew

In most cases, data skew issues occur on reducers. Mappers split data by input file. Typically, data is evenly split and distributed to workers. However, if data distribution across tables is not even, data cannot be evenly distributed to different workers. As a result, a data skew issue occurs. If data distribution is uneven, some workers quickly complete data computation, whereas other workers run for a long period of time to complete data computation. In actual production, most of the data is skewed, which is in line with the Pareto principle (80/20 rule). For example, 20% of active users of a forum contribute 80% of posts or 80% of page views of a website are provided by 20% of website visitors. In the era of big data with the explosive growth of data amount, data skew issues severely affect the execution efficiency of distributed programs. For example, the execution progress of a job stays at 99% for a long period of time due to data skew.

Identify data skew issues

In MaxCompute, you can use Logview to identify data skew issues. The following figure shows how to identify data skew issues.Identification process

  1. On the Fuxi Jobs page, sort Fuxi tasks of a Fuxi job based on the execution latency in descending order. Select the Fuxi task with the maximum execution latency.

  2. Sort Fuxi instances in the Fuxi task based on the execution latency in descending order. Select the Fuxi instance whose execution latency is much longer than the average execution latency, such as the first Fuxi instance, lock the Fuxi instance, and then view the output log in StdOut.

  3. View the directed acyclic graph (DAG) of the job based on the log information in StdOut.

  4. Locate the SQL code snippet that causes the data skew issue based on the key information in the DAG of the job.

Example

  1. Obtain the Logview logs based on the run logs of the job. For more information, see Entry point. Logview

  2. Go to the Logview UI, and sort Fuxi tasks based on the execution latency in descending order. Select the Fuxi task with the maximum execution latency to quickly identify the data skew issue. Fuxi Task

  3. Task R31_26_27 has the maximum execution latency. Click the R31_26_27 task to go to the instance details page, as shown in the following figure. Fuxi instance with the maximum execution latencyLatency: {min:00:00:06, avg:00:00:13, max:00:26:40} indicates that the minimum execution latency of all instances of the task is 6s, the average execution latency is 13s, and the maximum execution latency is 26 minutes and 40 seconds (1,600s). You can sort task instances by Latency (task execution latency) in descending order. The sorting result shows that four instances have long execution latencies. If the execution latency of a Fuxi instance is more than twice the average execution latency (26s), MaxCompute considers that the Fuxi instance is long-tailed. In this example, the execution latencies of 21 instances are longer than 26s, and the instances are long-tailed. A long-tail instance does not necessarily indicate that a data skew issue occurs. You must compare the value of avg with the value of max. If the value of max is much greater than the value of avg, a serious data skew issue occurs on the instance. You must govern this issue.

  4. Find the instance and click Output log in the StdOut column to view the output log, as shown in the following figure. Sample output result

  5. After the data skew issue is identified, right-click R31_26_27 on the Job Details tab, and select expand all. For more information, see Use LogView V2.0 to view job information. Expand the taskView StreamLineWriter21 before StreamLineRead22 to identify the keys that cause the data skew issue. In this example, the keys are new_uri_path_structure, cookie_x5check_userid, and cookie_userid. This way, you can locate the SQL code snippet that causes the data skew issue. KEY

Data skew troubleshooting and solutions

From practical experience, data skew issues are typically caused by the following reasons:

  • Join

  • GROUP BY

  • COUNT(DISTINCT)

  • ROW_NUMBER(TopN)

  • Dynamic partitioning

Reasons in the order of occurrence frequency are join > GROUP BY > COUNT(DISTINCT) > ROW_NUMBER > Dynamic partitioning.

Join

If a data skew issue is caused by a join operation, multiple cases may be involved, such as a join between large tables and small tables, a join between large tables and medium tables, or long tails due to hot key values during a join operation.

  • A join between large tables and small tables.

    • Data skew example

      In the following example, the t1 table is a large table, and the t2 and t3 tables are small tables.

      SELECT  t1.ip
              ,t1.is_anon
              ,t1.user_id
              ,t1.user_agent
              ,t1.referer
              ,t2.ssl_ciphers
              ,t3.shop_province_name
              ,t3.shop_city_name
      FROM    <viewtable> t1
      LEFT OUTER JOIN <other_viewtable> t2
      ON t1.header_eagleeye_traceid = t2.eagleeye_traceid
      LEFT OUTER JOIN (  SELECT  shop_id
                                  ,city_name AS shop_city_name
                                  ,province_name AS shop_province_name
                          FROM    <tenanttable>
                          WHERE   ds = MAX_PT('<tenanttable>')
                          AND     is_valid = 1
                      ) t3
      ON t1.shopid = t3.shop_id
    • Solution

      Use map join hints. For more information about map join hints, see MAPJOIN HINT. Sample statement:

      SELECT  /*+ mapjoin(t2,t3)*/
              t1.ip
              ,t1.is_anon
              ,t1.user_id
              ,t1.user_agent
              ,t1.referer
              ,t2.ssl_ciphers
              ,t3.shop_province_name
              ,t3.shop_city_name
      FROM    <viewtable> t1
      LEFT OUTER JOIN (<other_viewtable>) t2
      ON t1.header_eagleeye_traceid = t2.eagleeye_traceid
      LEFT OUTER JOIN (  SELECT  shop_id
                                  ,city_name AS shop_city_name
                                  ,province_name AS shop_province_name
                          FROM    <tenanttable>
                          WHERE   ds = MAX_PT('<tenanttable>')
                          AND     is_valid = 1
                      ) t3
      ON t1.shopid = t3.shop_id
      • Usage notes

        • When you reference a small table or a subquery, you must reference the alias of the table or subquery.

        • Map join operations support small tables in subqueries.

        • For a map join operation, you can use non-equi joins or combine conditions by using OR. You can calculate the Cartesian product by using MAPJOIN ON 1 = 1 without specifying the ON clause. For example, you can execute the select /*+ mapjoin(a) */ a.id from shop a join table_name b on 1=1; statement. However, this statement may cause the data bloat issue.

        • Separate multiple small tables in a map join operation with commas (,), such as /*+ mapjoin(a,b,c)*/.

        • In the Map stage, the MapReduce system performs a map join operation on a program to load all data in the specified tables into the memory of the program. The tables specified for the map join operation must be small tables, and the total memory occupied by the table data cannot exceed 512 MB. MaxCompute uses compressed storage. After the small tables are loaded into the memory, the data amount of the small tables sharply increases. 512 MB indicates the maximum data amount after small tables are loaded into the memory. You can set the odps.sql.mapjoin.memory.max parameter to a larger value to increase the memory size. The maximum value of the parameter is 8192. Unit: MB.

          set odps.sql.mapjoin.memory.max=2048;
      • Limits on map join operations

        • The left table in a LEFT OUTER JOIN operation must be a large table.

        • The right table in a RIGHT OUTER JOIN operation must be a large table.

        • Map join operations cannot be used in a FULL OUTER JOIN operation.

        • The left or right table in an INNER JOIN operation can be a large table.

        • MaxCompute allows you to specify a maximum of 128 small tables for a map join operation. If you specify more than 128 small tables, a syntax error is returned.

  • A join between large tables and medium tables.

    • Data skew example

      In the following example, the t0 table is a large table, and the t1 table is a medium table.

      SELECT  request_datetime
              ,host
              ,URI
              ,eagleeye_traceid
      from <viewtable>
         t0
      LEFT join (
          SELECT
          traceid,
          eleme_uid,
          isLogin_is
          from <servicetable>
          where ds = '${today}'
          AND     hh = '${hour}'
      ) t1 on t0.eagleeye_traceid = t1.traceid
      WHERE   ds = '${today}'
      AND     hh = '${hour}'
    • Solution

      Use DISTRIBUTED MAPJOIN. For more information about the syntax, see DISTRIBUTED MAPJOIN. Sample statement:

      SELECT  /*+distmapjoin(t1)*/
              request_datetime
              ,host
              ,URI
              ,eagleeye_traceid
      from <viewtable>
         t0
      LEFT join (
          SELECT
          traceid,
          eleme_uid,
          isLogin_is
          from <servicetable>
          where ds = '${today}'
          AND     hh = '${hour}'
      ) t1 on t0.eagleeye_traceid = t1.traceid
      WHERE   ds = '${today}'
      AND     hh = '${hour}'
  • Long tails due to hot key values during a join operation.

    • Data skew example

      In the following example, a large number of hot key values exist in the eleme_uid field. A data skew issue is likely to occur.

      SELECT
      eleme_uid,
      ...
      FROM (
          SELECT
          eleme_uid,
          ...
          FROM <viewtable>
      )t1
      LEFT JOIN(
          SELECT
          eleme_uid,
          ...
          FROM <customertable>
      )  t2
      on t1.eleme_uid = t2.eleme_uid;
    • Solution

      The following table describes the solutions that you can use to resolve the data skew issue.

      No.

      Solution

      Description

      Solution 1

      Manually split hot key values.

      Filter out hot key values from the primary table based on the analysis result. Perform a map join operation on the hot key values, perform a merge join operation on non-hot key values, and then merge the results of the two join operations.

      Solution 2

      Configure the skew join parameter.

      Set odps.sql.skewjoin to true.

      Solution 3

      Add a skew join hint.

      Use a skew join hint in the following format: /*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/. If you use a skew join hint, the operation to obtain skew keys repeats once. This prolongs the query period. If skew keys have been obtained, you can configure the skew join parameter to save time.

      Solution 4

      Perform a modulo equal join by using a multiple table.

      Use a multiple table.

      • Manually split hot key values.

        Filter out hot key values from the primary table based on the analysis result. Perform a map join operation on the hot key values, perform a merge join operation on non-hot key values, and then merge the results of the two join operations. Sample code:

        SELECT
        /*+ MAPJOIN (t2) */
        eleme_uid,
        ...
        FROM (
            SELECT
            eleme_uid,
            ...
            FROM <viewtable>
            WHERE eleme_uid = <skewed_value>
        )t1
        LEFT JOIN(
            SELECT
            eleme_uid,
            ...
            FROM <customertable>
            WHERE eleme_uid = <skewed_value>
        )  t2
        on t1.eleme_uid = t2.eleme_uid
        UNION ALL
        SELECT
        eleme_uid,
        ...
        FROM (
            SELECT
            eleme_uid,
            ...
            FROM <viewtable>
            WHERE eleme_uid != <skewed_value>
        )t3
        LEFT JOIN(
            SELECT
            eleme_uid,
            ...
            FROM <customertable>
            WHERE eleme_uid != <skewed_value>
        )  t4
        on t3.eleme_uid = t4.eleme_uid
      • Configure the skew join parameter.

        This is a common solution. MaxCompute allows you to set the odps.sql.skewjoin parameter to true to enable the skew join feature. If you enable only the skew join feature, the execution of the task is not affected. You must also set the odps.sql.skewinfo parameter to make the skew join feature take effect. The odps.sql.skewinfo parameter specifies the join optimization information. Sample commands:

        set odps.sql.skewjoin=true;
        set odps.sql.skewinfo=skewed_src:(skewed_key)[("skewed_value")];  -- skewed_src specifies a traffic table, and skewed_value specifies a hot key value.

        Examples:

        Configure the join optimization information for a single skewed value of a single field.
        set odps.sql.skewinfo=src_skewjoin1:(key)[("0")];
        Configure the join optimization information for multiple skewed values of a single field.
        set odps.sql.skewinfo=src_skewjoin1:(key)[("0")("1")];
      • Add a skew join hint.

        In the SELECT statement, add a skew join hint in the following format to perform a map join operation: /*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/. table_name specifies the name of the skewed table, column_name specifies the name of the skewed column, and value specifies the skewed key value. Examples:

        -- Method 1: Add a hint to specify the alias of the table. 
        select /*+ skewjoin(a) */ * from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1;
        -- Method 2: Add a hint to specify the name of the table and the names of possibly skewed columns. For example, the following statement shows that Columns c0 and c1 in Table a are skewed columns. 
        select /*+ skewjoin(a(c0, c1)) */ * from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1 and a.c2 = b.c2;
        -- Method 3: Add a hint to specify the name of table and the names of columns and provide the skewed key values. If skewed key values are of the STRING type, enclose each value with double quotation marks ("). In the following statement, (a.c0=1 and a.c1="2") and (a.c0=3 and a.c1="4") contain skewed key values. 
        select /*+ skewjoin(a(c0, c1)((1, "2"), (3, "4"))) */ * from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1 and a.c2 = b.c2;
        Note

        If you use a skew join hint to specify skewed values, the processing efficiency is higher than that of Method 1 and Method 2 (no skewed values specified).

        If you use a skew join hint for join operations, take note of the following limits:

        • INNER JOIN: Skew join hints can be specified for the left or right table in the INNER JOIN statement.

        • LEFT JOIN, SEMI JOIN, and ANTI JOIN: Skew join hints can be specified only for the left table.

        • RIGHT JOIN: Skew join hints can be specified only for the right table.

        • FULL JOIN: Skew join hints are not supported.

        An Aggregate is run after a skew join hint is added, which slows down the join operation. Therefore, we recommend that you add a skew join hint only to the JOIN statements that cause data skew.

        The data type of Left Side Join Key must be the same as that of Right Side Join Key for the JOIN statement to which a skew join hint is added. If the data types are different, the skew join hint becomes ineffective. In the preceding example, the data types of a.c0 and b.c0 must be the same and the data types of a.c1 and b.c1 must be the same. To ensure data type consistency, you can use the CAST function to convert the data types of join keys in subqueries. Sample statements:

        create table T0(c0 int, c1 int, c2 int, c3 int);
        create table T1(c0 string, c1 int, c2 int);
        -- Method 1:
        select /*+ skewjoin(a) */ * from T0 a join T1 b on cast(a.c0 as string) = cast(b.c0 as string) and a.c1 = b.c1;
        -- Method 2:
        select /*+ skewjoin(b) */ * from (select cast(a.c0 as string) as c00 from T0 a) b join T1 c on b.c00 = c.c0;

        After a skew join hint is added, the optimizer runs an Aggregate to obtain the first N hot key values. By default, the first 20 hot key values are obtained. You can run the set odps.optimizer.skew.join.topk.num = xx; command to specify the number of hot key values that the optimizer can obtain.

        • Skew join hints allow you to add a hint only for the left or right table involved in a JOIN statement.

        • In the JOIN statement to which a skew join hint is added, left key = right key must be included. Skew join hints cannot be added to the CARTESIAN JOIN statement.

        • You cannot add a skew join hint to a JOIN statement to which a map join hint is added.

      • Perform a modulo equal join by using a multiple table.

        The logic of this solution is different from that of the other solutions. In this solution, a multiple table that contains only one column of the INT type is used. For example, the column contains values from 1 to N. The specific values are determined based on the skew extent. You can use the multiple table to bloat the user behavior table by N times. The two associated keys user ID and number are used for the join operation. This way, the user ID-based data distribution that causes the data skew issue is changed to the 1/N data distribution after the two associated keys user ID and number are used. However, the amount of data is increased by N times in this case.

        SELECT
        eleme_uid,
        ...
        FROM (
            SELECT
            eleme_uid,
            ...
            FROM <viewtable>
        )t1
        LEFT JOIN(
            SELECT
            /*+mapjoin(<multipletable>)*/
            eleme_uid,
            number
            ...
            FROM <customertable>
            JOIN <multipletable>
        )  t2
        on t1.eleme_uid = t2.eleme_uid
        and mod(t1.<value_col>,10)+1 = t2.number;

        Based on the preceding data bloat mechanism, you can make the data bloat take effect only for the hot key values in the two tables. Non-hot key values in the tables remain unchanged. Find the hot key values, and separately process the traffic table and user behavior table. Add the eleme_uid_join column to the tables. If the user ID is a hot key value, the CONCAT method is used to return a random positive integer. The integer ranges from 0 to a predefined multiple, such as 0 to 1000. If the returned value is not within the specified range, the original user ID remains unchanged. When you join two tables, the eleme_uid_join column is used. This way, the multiple of hot key values is bloated, and data skew is mitigated. Unnecessary bloat of non-hot key values is prevented. However, the SQL code based on the original business logic is completely changed. Therefore, we recommend that you do not use this solution.

GROUP BY

Sample pseudocode in which a GROUP BY clause is specified:

SELECT  shop_id
        ,sum(is_open) AS Business days
FROM    table_xxx_di
WHERE   dt BETWEEN '${bizdate_365}' AND  '${bizdate}'
GROUP BY shop_id;

The following table describes the solutions to data skew issues that are caused by the GROUP BY clause.

No.

Solution

Description

Solution 1

Configure a parameter to enable the anti-skew feature for the GROUP BY clause.

Add set odps.sql.groupby.skewindata=true.

Solution 2

Add a random number.

Split the key that causes a long tail.

Solution 3

Create a rolling table.

This solution helps reduce costs and improve efficiency.

  • Solution 1: Configure a parameter to enable the anti-skew feature for the GROUP BY clause.

    set odps.sql.groupby.skewindata=true;
  • Solution 2: Add a random number.

    Compared with Solution 1, this solution allows you to rewrite the SQL statement by adding a random number. This solution is used to split the key that causes a long tail of the GROUP BY clause.

    For the statement Select Key,Count(*) As Cnt From TableName Group By Key;, combiners are not required. The MapReduce system shuffles output data from mappers to reducers. Then, reducers perform COUNT operations. The related execution plan is M -> R.

    In the following example, the key that causes a long tail is reassigned.

    -- The key that causes a long tail is KEY001.
    SELECT  a.Key
            ,SUM(a.Cnt) AS Cnt
    FROM(SELECT  Key
                ,COUNT(*) AS Cnt
                FROM    <TableName>
                GROUP BY Key
                ,CASE WHEN KEY = 'KEY001' THEN Hash(Random()) % 50
                 ELSE 0
                END
            ) a
    GROUP BY a.Key;

    After the key is reassigned, the execution plan becomes M -> R -> R. One more step is added for the execution plan, but the key that causes a long tail is processed by using two steps. This way, the entire execution duration may be reduced. The resource and time consumption are basically the same as that of Solution 1. However, in actual scenarios, a long tail is typically caused by multiple keys. The costs for identifying the keys and rewriting SQL statements are high. Solution 1 is more cost-efficient.

  • Create a rolling table.

    The solution aims to reduce costs and improve efficiency. The core requirement is to retrieve the merchant data for the past year. For online tasks, the data of all partitions from T-1 to T-365 needs to be read, which causes a waste of resources. If you create a rolling table, the amount of data that is read from partitions is reduced but the retrieval of all data for the past year is not affected.

    In the following example, merchant business data for 365 days is initialized for the first time. The GROUP BY clause is used to collect data. The date on which data is updated is remarked and data is stored in Table a. For subsequent tasks, Table a that is used to collect data on date T-2 is associated with the table table_xxx_di and then a GROUP BY operation is performed. This way, the number of partitions from which data is read each day is reduced from 365 to 2. The number of times the primary key shopid is read is significantly reduced, and resource consumption is also reduced.

    -- Create a rolling table.
    CREATE TABLE IF NOT EXISTS m_xxx_365_df
    (
      shop_id STRING COMMENT,
      last_update_ds COMMENT,
      365d_open_days COMMENT
    )
    PARTITIONED BY
    (
        ds STRING COMMENT 'Date partition'
    )
    LIFECYCLE 7;
    -- Initialize the m_xxx_365_df table that stores the data from May 1, 2021 to May 1, 2022.
    INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '20220501')
      SELECT shop_id,
             max(ds) as last_update_ds,
             sum(is_open) AS 365d_open_days
      FROM table_xxx_di
      WHERE dt BETWEEN '20210501' AND '20220501'
      GROUP BY shop_id;
    -- Execute the following statement for subsequent tasks:
    INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '${bizdate}')
      select aa.shop_id,
             aa.last_update_ds,
             365d_open_days - COALESCE(is_open, 0) as 365d_open_days -- Infinite rolling storage without specifying business days.
      from (
        select shop_id,
               max(last_update_ds) as last_update_ds,
               sum(365d_open_days) AS 365d_open_days
        from (
          SELECT shop_id,
                 ds as last_update_ds,
                 sum(is_open) AS 365d_open_days
          FROM table_xxx_di
          WHERE ds = '${bizdate}'
          GROUP BY shop_id
          union all
          SELECT shop_id,
                 last_update_ds,
                 365d_open_days
          FROM m_xxx_365_df
          WHERE dt = '${bizdate_2}' and last_update_ds >= '${bizdate_365}'
          GROUP BY shop_id
        )
        group by shop_id
      ) as aa
      left join (
        SELECT shop_id,
               is_open
        FROM table_xxx_di
        WHERE ds = '${bizdate_366}'
      ) as bb
      on aa.shop_id = bb.shop_id;

COUNT(DISTINCT)

In this example, a table contains the following data:

ds (partition)

cnt (number of records)

20220416

73025514

20220415

2292806

20220417

2319160

If you use the following statement, a data skew issue is likely to occur.

SELECT  ds
        ,COUNT(DISTINCT shop_id) AS cnt
FROM    demo_data0
GROUP BY ds;

The following table describes the solutions that you can use to resolve the data skew issue.

No.

Solution

Description

Solution 1

Optimize the parameter settings.

Add SET odps.sql.groupby.skewindata=true;.

Solution 2

Perform two-stage aggregate operations.

Concatenate the value of the partition field with a random number.

Solution 3

This solution is similar to Solution 2.

Perform a GROUP BY operation on (ds+shop_id) and then perform a COUNT(DISTINCT) operation.

  • Solution 1: Optimize the parameter settings.

    Set the odps.sql.groupby.skewindata parameter to true.

    SET odps.sql.groupby.skewindata=true;
  • Solution 2: Perform two-stage aggregate operations.

    If data of the shop_id field is not evenly distributed, you cannot use Solution 1 to resolve the data skew issue. You can concatenate the value of the partition field with a random number.

    -- Method 1: Execute CONCAT(ROUND(RAND(),1)*10,'_', ds) AS rand_ds.
    SELECT  SPLIT_PART(rand_ds, '_',2) ds
            ,COUNT(*) id_cnt
      FROM (
            SELECT  rand_ds
                    ,shop_id
            FROM    demo_data0
            GROUP BY rand_ds,shop_id
            )
    GROUP BY SPLIT_PART(rand_ds, '_',2);
    -- Method 2: Execute ROUND(RAND(),1)*10 AS randint10 to add a random number.
    SELECT  ds
            ,COUNT(*) id_cnt
    FROM    (SELECT  ds
                     ,randint10
                     ,shop_id
               FROM  demo_data0
            GROUP BY ds,randint10,shop_id
            )
    GROUP BY ds;
  • Solution 3: Implement a solution that is similar to Solution 2.

    If data of the fields that are used for GROUP BY and DISTINCT operations is evenly distributed, you can perform a GROUP BY operation on the two group fields ds and shop_id and then run the COUNT(DISTINCT) command.

    SELECT  ds
            ,COUNT(*) AS cnt
    FROM(SELECT  ds
                ,shop_id
                FROM    demo_data0
                GROUP BY ds ,shop_id
        )
    GROUP BY ds;

ROW_NUMBER(TopN)

The following code provides an example on how to return data of top 10 rows.

SELECT  main_id
        ,type
FROM    (SELECT  main_id
                 ,type
                 ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
            FROM <data_demo2>
        ) A
WHERE   A.rn <= 10;

If a data skew issue is caused by ROW_NUMBER(TopN), you can use one of the following solutions to resolve the issue.

No.

Solution

Description

Solution 1

Perform two-stage aggregate operations by using SQL statements.

Add random columns or concatenate a random number as a parameter in the partition.

Solution 2

Perform two-stage aggregate operations by using a user-defined aggregate function (UDAF).

Use a UDAF for the queue with the minimum heap.

  • Solution 1: Perform two-stage aggregate operations by using SQL statements.

    In the Map stage, add a random column as a parameter in the partition to evenly distribute data in each group of partitions

    SELECT  main_id
            ,type
      FROM  (SELECT  main_id
                     ,type
                     ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
                FROM (SELECT  main_id
                              ,type
                            FROM (SELECT  main_id
                                          ,type
                                          ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn
                                     FROM (SELECT  main_id
                                                   ,type
                                                   ,ceil(110 * rand()) % 11 AS src_pt
                                             FROM  data_demo2
                                          )
                                    ) B
                            WHERE   B.rn <= 10
                        )
            ) A
    WHERE   A.rn <= 10;
    -- Configure a random number.
    SELECT  main_id
            ,type
      FROM  (SELECT  main_id
                     ,type
                     ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
                FROM (SELECT  main_id
                              ,type
                        FROM(SELECT  main_id
                                     ,type
                                     ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn
                               FROM  (SELECT  main_id
                                              ,type
                                              ,ceil(10 * rand()) AS src_pt
                                              FROM    data_demo2
                                      )
                                    ) B
                            WHERE B.rn <= 10
                        )
            ) A
    WHERE   A.rn <= 10;
  • Solution 2: Perform two-stage aggregate operations by using a UDAF.

    SQL has a large amount of code and may be difficult to maintain. In this example, the queue with the minimum heap is optimized by using a UDAF. In the iterate stage, only top N rows of data are retrieved, and only N elements are merged in the merge stage. Implementation of the entire process:

    • iterate: pushes the first K elements, compares the elements after K with the elements in the queue with the minimum heap, and then exchanges the elements in the queue with the minimum heap.

    • merge: returns the first K elements after the two heaps of elements are merged.

    • terminate: returns the heap as an array.

    • Split an array into rows by using SQL statements.

    @annotate('* -> array<string>')
    class GetTopN(BaseUDAF):
        def new_buffer(self):
            return [[], None]
        def iterate(self, buffer, order_column_val, k):
            # heapq.heappush(buffer, order_column_val)
            # buffer = [heapq.nlargest(k, buffer), k]
            if not buffer[1]:
                buffer[1] = k
            if len(buffer[0]) < k:
                heapq.heappush(buffer[0], order_column_val)
            else:
                heapq.heappushpop(buffer[0], order_column_val)
        def merge(self, buffer, pbuffer):
            first_buffer, first_k = buffer
            second_buffer, second_k = pbuffer
            k = first_k or second_k
            merged_heap = first_buffer + second_buffer
            merged_heap.sort(reverse=True)
            merged_heap = merged_heap[0: k] if len(merged_heap) > k else merged_heap
            buffer[0] = merged_heap
            buffer[1] = k
        def terminate(self, buffer):
            return buffer[0]
    
    SET odps.sql.python.version=cp37;
    SELECT main_id,type_val FROM (
      SELECT  main_id ,get_topn(type, 10) AS type_array
      FROM data_demo2
      GROUP BY main_id
    )
    LATERAL VIEW EXPLODE(type_array)type_ar AS type_val;

Dynamic partitioning

Dynamic partitioning is a syntax that allows you to specify the name of a partition key column in a partition when you insert data into a partitioned table. You do not need to specify the value of the partition key column. You need to only specify the value of the partition key column in a SELECT clause. Before you execute an SQL statement, the partitions to be generated are non-deterministic. After the SQL statement is completely executed, you can determine the partitions that are generated based on the values of partition key columns. For more information, see Insert or overwrite data into dynamic partitions (DYNAMIC PARTITION). Sample statements:

create table total_revenues (revenue bigint) partitioned by (region string);

insert overwrite table total_revenues partition(region)
select  total_price as revenue
      ,region
from    sale_detail;

When you create tables that contain dynamic partitions in different scenarios, data skew issues are likely to occur. When a data skew issue occurs, you can use one of the following solutions to resolve the issue.

No.

Solution

Description

Solution 1

Optimize parameter settings.

Optimize parameter settings.

Solution 2

Perform partition pruning.

This solution allows you to prune the partitions in which a large number of data records are stored and separately insert data.

  • Solution 1: Optimize parameter settings.

    Dynamic partitioning allows you to place data that meets different conditions into different partitions to prevent multiple INSERT OVERWRITE operations on a table. This simplifies the code especially when a large number of partitions are available. However, dynamic partitioning may also cause an excessive number of small files to be generated.

    • Data skew example

      In this example, the following SQL statement is used.

      insert into table part_test partition(ds) select * from  part_test;

      K map instances and N destination partitions exist.

                                  ds=1
      cfile1                      ds=2
      ...             X           ds=3
      cfilek                      ...
                                  ds=n

      In the most extreme case, K × N small files may be generated and are difficult to manage. To address this issue, MaxCompute provides additional level-1 reduce tasks for dynamic partitions. This way, one or a small number of reduce instances are used to write data to the same partition. This prevents an excessive number of small files from being generated and ensures that the last reduce task is used to perform reduce operations. By default, odps.sql.reshuffle.dynamicpt is set to true to enable the feature.

      set odps.sql.reshuffle.dynamicpt=true;

      After the feature is enabled, the number of small files is not excessively large, and reduce tasks can be successfully executed. However, data skew issues may occur. Additional level-1 reduce operations consume computing resources. Therefore, you must exercise caution when you perform this operation.

    • Solution

      The configuration of set odps.sql.reshuffle.dynamicpt=true; is used to prevent an excessive number of small files from being generated when additional level-1 reduce tasks are used. If a small number of destination partitions exist, the number of small files that are generated is not large. The configuration may cause a waste of resources and degrades performance. In this case, the configuration of set odps.sql.reshuffle.dynamicpt=false; can significantly improve performance. Sample statement:

      insert overwrite table ads_tb_cornucopia_pool_d partition (ds, lv, tp)
      select /*+ mapjoin(t2) */
          '20150503' as ds,
          t1.lv as lv,
          t1.type as tp
      from
          (select  ...
          from tbbi.ads_tb_cornucopia_user_d
          where ds = '20150503'
          and lv in ('flat', '3rd')
          and tp = 'T'
          and pref_cat2_id > 0
          ) t1
      join
          (select ...
          from tbbi.ads_tb_cornucopia_auct_d
          where ds = '20150503'
          and tp = 'T'
          and is_all = 'N'
          and cat2_id > 0
          ) t2
      on t1.pref_cat2_id = t2.cat2_id;

      If you use the default parameters in the preceding code, the run time of the entire task is about 1 hour and 30 minutes. The run time of the last Reduce task is about 1 hour and 20 minutes, which accounts for about 90% of the total run time. After an additional Reduce task is introduced, the data distribution of each Reduce instance is particularly uneven. As a result, a long tail occurs.

    In the preceding example, the number of generated dynamic partitions that are counted based on the statistical information shows that the number of dynamic partitions generated per day is about 2. Therefore, the configuration of set odps.sql.reshuffle.dynamicpt=false; is allowed. This way, the task can be completed in 9 minutes. Therefore, the configuration of set odps.sql.reshuffle.dynamicpt=false; can help improve performance, reduce the computing duration, save computing resources, and increase marginal revenues.

    The configuration of set odps.sql.reshuffle.dynamicpt=false; is also suitable for tasks whose execution duration is not long and resource consumption is not high only if a small number of dynamic partitions exist. The configuration can help save resources and improve performance.

    Solution 1 can be used for nodes if all the following conditions are met, without consideration of task execution durations.

    • Dynamic partitions are used.

    • The number of dynamic partitions is less than or equal to 50.

    • The configuration of set odps.sql.reshuffle.dynamicpt=false; is not used.

    You can determine the priority of the node to configure the odps.sql.reshuffle.dynamicpt parameter based on the execution duration of the last Fuxi instance. The diag_level field specifies the node priority. You can configure the field based on the following rules:

    • Diag_Level=4 ('Severe'): Last_Fuxi_Inst_Time is longer than 30 minutes.

    • Diag_Level=3 ('High'): Last_Fuxi_Inst_Time ranges from 20 minutes to 30 minutes.

    • Diag_Level=2 ('Medium'): Last_Fuxi_Inst_Time ranges from 10 minutes to 20 minutes.

    • Diag_Level=1 ('Low'): Last_Fuxi_Inst_Time is less than 10 minutes.

  • Solution 2: Perform partition pruning.

    If the data skew issue occurs in the Map stage when you insert data into dynamic partitions, you can identify the partitions in which a large number of records are stored, perform partition pruning, and then separately insert data. You can modify the parameter settings of the Map stage based on the actual situation. The following code shows an example.

    set odps.sql.mapper.split.size=128;
    INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
    SELECT  *
    FROM    dwd_alsc_ent_shop_info_hi;

    The returned result shows that a full-table scan is performed in the entire process. You can use the configuration of SET odps.sql.reshuffle.dynamicpt=false; to resolve the data skew issue. Sample statements:

    SET odps.sql.reshuffle.dynamicpt=false ;
    INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
    SELECT *
    FROM dwd_alsc_ent_shop_info_hi;

    If the data skew issue occurs in the Map stage when you insert data into dynamic partitions, you can identify the partitions in which a large number of records are stored, perform partition pruning, and then separately insert data. The following steps show an example.

    1. Execute the following statement to query the partitions in which a large number of records are stored:

      SELECT  ds
              ,hh
              ,COUNT(*) AS cnt
      FROM    dwd_alsc_ent_shop_info_hi
      GROUP BY ds
               ,hh
      ORDER BY cnt DESC;

      The following table lists the data of some partitions.

      ds

      hh

      cnt

      20200928

      17

      1052800

      20191017

      17

      1041234

      20210928

      17

      1034332

      20190328

      17

      1000321

      20210504

      1

      19

      20191003

      20

      18

      20200522

      1

      18

      20220504

      1

      18

    2. Run the SET command with the following SQL statements to prune the partitions that you queried and separately insert data into each partition:

      set odps.sql.reshuffle.dynamicpt=false ;
      INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
      SELECT  *
      FROM    dwd_alsc_ent_shop_info_hi
      WHERE   CONCAT(ds,hh) NOT IN ('2020092817','2019101717','2021092817','2019032817');
      
      set odps.sql.reshuffle.dynamicpt=false ;
      INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
      SELECT  *
      FROM    dwd_alsc_ent_shop_info_hi
      WHERE   CONCAT(ds,hh) IN ('2020092817','2019101717','2021092817','2019032817');
      
      SELECT  ds
        ,hh,COUNT(*) as cnt
       FROM dwd_alsc_ent_shop_info_hi
       GROUP BY ds,hh ORDER BY cnt desc;