This topic describes the common data skew issues that occur when you use MaxCompute and provides related solutions.
MapReduce
To understand data skew, you must be familiar with MapReduce. MapReduce is a typical distributed computing framework. MapReduce uses the split-apply-combine method, which divides big issues into pieces or divides difficult issues into easy-to-handle sub-issues, separately produces results for each sub-issue, and then merges the results into a final result. Compared with the traditional parallel programming framework, MapReduce has the advantages of high fault tolerance, ease of use, and high scalability. When you use MapReduce to perform parallel programming, you need to only consider the issues that are related to programming in distributed clusters. You do not need to consider other issues such as data storage, information exchange between nodes, and transmission mechanisms. This way, distributed programming operations are simplified.
The following figure shows the workflow of MapReduce.
Data skew
In most cases, data skew issues occur on reducers. Mappers split data by input file. Typically, data is evenly split and distributed to workers. However, if data distribution across tables is not even, data cannot be evenly distributed to different workers. As a result, a data skew issue occurs. If data distribution is uneven, some workers quickly complete data computation, whereas other workers run for a long period of time to complete data computation. In actual production, most of the data is skewed, which is in line with the Pareto principle (80/20 rule). For example, 20% of active users of a forum contribute 80% of posts or 80% of page views of a website are provided by 20% of website visitors. In the era of big data with the explosive growth of data amount, data skew issues severely affect the execution efficiency of distributed programs. For example, the execution progress of a job stays at 99% for a long period of time due to data skew.
Identify data skew issues
In MaxCompute, you can use Logview to identify data skew issues. The following figure shows how to identify data skew issues.
On the Fuxi Jobs page, sort Fuxi tasks of a Fuxi job based on the execution latency in descending order. Select the Fuxi task with the maximum execution latency.
Sort Fuxi instances in the Fuxi task based on the execution latency in descending order. Select the Fuxi instance whose execution latency is much longer than the average execution latency, such as the first Fuxi instance, lock the Fuxi instance, and then view the output log in StdOut.
View the directed acyclic graph (DAG) of the job based on the log information in StdOut.
Locate the SQL code snippet that causes the data skew issue based on the key information in the DAG of the job.
Example
Obtain the Logview logs based on the run logs of the job. For more information, see Entry point.
Go to the Logview UI, and sort Fuxi tasks based on the execution latency in descending order. Select the Fuxi task with the maximum execution latency to quickly identify the data skew issue.
Task
R31_26_27
has the maximum execution latency. Click theR31_26_27
task to go to the instance details page, as shown in the following figure.Latency: {min:00:00:06, avg:00:00:13, max:00:26:40}
indicates that the minimum execution latency of all instances of the task is6s
, the average execution latency is13s
, and the maximum execution latency is26 minutes and 40 seconds (1,600s)
. You can sort task instances byLatency
(task execution latency) in descending order. The sorting result shows that four instances have long execution latencies. If the execution latency of a Fuxi instance is more than twice the average execution latency (26s
), MaxCompute considers that the Fuxi instance is long-tailed. In this example, the execution latencies of 21 instances are longer than26s
, and the instances are long-tailed. A long-tail instance does not necessarily indicate that a data skew issue occurs. You must compare the value ofavg
with the value ofmax
. If the value ofmax
is much greater than the value ofavg
, a serious data skew issue occurs on the instance. You must govern this issue.Find the instance and click in the StdOut column to view the output log, as shown in the following figure.
After the data skew issue is identified, right-click
R31_26_27
on the Job Details tab, and select expand all. For more information, see Use LogView V2.0 to view job information. ViewStreamLineWriter21
beforeStreamLineRead22
to identify the keys that cause the data skew issue. In this example, the keys arenew_uri_path_structure
,cookie_x5check_userid
, andcookie_userid
. This way, you can locate the SQL code snippet that causes the data skew issue.
Data skew troubleshooting and solutions
From practical experience, data skew issues are typically caused by the following reasons:
Join
GROUP BY
COUNT(DISTINCT)
ROW_NUMBER(TopN)
Dynamic partitioning
Reasons in the order of occurrence frequency are join > GROUP BY > COUNT(DISTINCT) > ROW_NUMBER > Dynamic partitioning
.
Join
If a data skew issue is caused by a join operation, multiple cases may be involved, such as a join between large tables and small tables, a join between large tables and medium tables, or long tails due to hot key values during a join operation.
A join between large tables and small tables.
Data skew example
In the following example, the
t1
table is a large table, and thet2
andt3
tables are small tables.SELECT t1.ip ,t1.is_anon ,t1.user_id ,t1.user_agent ,t1.referer ,t2.ssl_ciphers ,t3.shop_province_name ,t3.shop_city_name FROM <viewtable> t1 LEFT OUTER JOIN <other_viewtable> t2 ON t1.header_eagleeye_traceid = t2.eagleeye_traceid LEFT OUTER JOIN ( SELECT shop_id ,city_name AS shop_city_name ,province_name AS shop_province_name FROM <tenanttable> WHERE ds = MAX_PT('<tenanttable>') AND is_valid = 1 ) t3 ON t1.shopid = t3.shop_id
Solution
Use map join hints. For more information about map join hints, see MAPJOIN HINT. Sample statement:
SELECT /*+ mapjoin(t2,t3)*/ t1.ip ,t1.is_anon ,t1.user_id ,t1.user_agent ,t1.referer ,t2.ssl_ciphers ,t3.shop_province_name ,t3.shop_city_name FROM <viewtable> t1 LEFT OUTER JOIN (<other_viewtable>) t2 ON t1.header_eagleeye_traceid = t2.eagleeye_traceid LEFT OUTER JOIN ( SELECT shop_id ,city_name AS shop_city_name ,province_name AS shop_province_name FROM <tenanttable> WHERE ds = MAX_PT('<tenanttable>') AND is_valid = 1 ) t3 ON t1.shopid = t3.shop_id
Usage notes
When you reference a small table or a subquery, you must reference the alias of the table or subquery.
Map join operations support small tables in subqueries.
For a map join operation, you can use non-equi joins or combine conditions by using
OR
. You can calculate the Cartesian product by usingMAPJOIN ON 1 = 1
without specifying theON
clause. For example, you can execute theselect /*+ mapjoin(a) */ a.id from shop a join table_name b on 1=1;
statement. However, this statement may cause the data bloat issue.Separate multiple small tables in a
map join
operation with commas (,), such as/*+ mapjoin(a,b,c)*/
.In the Map stage, the MapReduce system performs a map join operation on a program to load all data in the specified tables into the memory of the program. The tables specified for the map join operation must be small tables, and the total memory occupied by the table data cannot exceed 512 MB. MaxCompute uses compressed storage. After the small tables are loaded into the memory, the data amount of the small tables sharply increases. 512 MB indicates the maximum data amount after small tables are loaded into the memory. You can set the odps.sql.mapjoin.memory.max parameter to a larger value to increase the memory size. The maximum value of the parameter is 8192. Unit: MB.
set odps.sql.mapjoin.memory.max=2048;
Limits on map join operations
The left table in a
LEFT OUTER JOIN
operation must be a large table.The right table in a
RIGHT OUTER JOIN
operation must be a large table.Map join operations cannot be used in a
FULL OUTER JOIN
operation.The left or right table in an
INNER JOIN
operation can be a large table.MaxCompute allows you to specify a maximum of 128 small tables for a map join operation. If you specify more than 128 small tables, a syntax error is returned.
A join between large tables and medium tables.
Data skew example
In the following example, the
t0
table is a large table, and thet1
table is a medium table.SELECT request_datetime ,host ,URI ,eagleeye_traceid from <viewtable> t0 LEFT join ( SELECT traceid, eleme_uid, isLogin_is from <servicetable> where ds = '${today}' AND hh = '${hour}' ) t1 on t0.eagleeye_traceid = t1.traceid WHERE ds = '${today}' AND hh = '${hour}'
Solution
Use DISTRIBUTED MAPJOIN. For more information about the syntax, see DISTRIBUTED MAPJOIN. Sample statement:
SELECT /*+distmapjoin(t1)*/ request_datetime ,host ,URI ,eagleeye_traceid from <viewtable> t0 LEFT join ( SELECT traceid, eleme_uid, isLogin_is from <servicetable> where ds = '${today}' AND hh = '${hour}' ) t1 on t0.eagleeye_traceid = t1.traceid WHERE ds = '${today}' AND hh = '${hour}'
Long tails due to hot key values during a join operation.
Data skew example
In the following example, a large number of hot key values exist in the
eleme_uid
field. A data skew issue is likely to occur.SELECT eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> )t1 LEFT JOIN( SELECT eleme_uid, ... FROM <customertable> ) t2 on t1.eleme_uid = t2.eleme_uid;
Solution
The following table describes the solutions that you can use to resolve the data skew issue.
No.
Solution
Description
Solution 1
Manually split hot key values.
Filter out hot key values from the primary table based on the analysis result. Perform a map join operation on the hot key values, perform a merge join operation on non-hot key values, and then merge the results of the two join operations.
Solution 2
Configure the skew join parameter.
Set
odps.sql.skewjoin
to true.Solution 3
Add a skew join hint.
Use a skew join hint in the following format:
/*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/
. If you use a skew join hint, the operation to obtain skew keys repeats once. This prolongs the query period. If skew keys have been obtained, you can configure the skew join parameter to save time.Solution 4
Perform a modulo equal join by using a multiple table.
Use a multiple table.
Manually split hot key values.
Filter out hot key values from the primary table based on the analysis result. Perform a map join operation on the hot key values, perform a merge join operation on non-hot key values, and then merge the results of the two join operations. Sample code:
SELECT /*+ MAPJOIN (t2) */ eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> WHERE eleme_uid = <skewed_value> )t1 LEFT JOIN( SELECT eleme_uid, ... FROM <customertable> WHERE eleme_uid = <skewed_value> ) t2 on t1.eleme_uid = t2.eleme_uid UNION ALL SELECT eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> WHERE eleme_uid != <skewed_value> )t3 LEFT JOIN( SELECT eleme_uid, ... FROM <customertable> WHERE eleme_uid != <skewed_value> ) t4 on t3.eleme_uid = t4.eleme_uid
Configure the skew join parameter.
This is a common solution. MaxCompute allows you to set the
odps.sql.skewjoin
parameter to true to enable the skew join feature. If you enable only the skew join feature, the execution of the task is not affected. You must also set theodps.sql.skewinfo
parameter to make the skew join feature take effect. Theodps.sql.skewinfo
parameter specifies the join optimization information. Sample commands:set odps.sql.skewjoin=true; set odps.sql.skewinfo=skewed_src:(skewed_key)[("skewed_value")]; -- skewed_src specifies a traffic table, and skewed_value specifies a hot key value.
Examples:
Configure the join optimization information for a single skewed value of a single field. set odps.sql.skewinfo=src_skewjoin1:(key)[("0")]; Configure the join optimization information for multiple skewed values of a single field. set odps.sql.skewinfo=src_skewjoin1:(key)[("0")("1")];
Add a skew join hint.
In the
SELECT
statement, add a skew join hint in the following format to perform a map join operation:/*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/
.table_name
specifies the name of the skewed table,column_name
specifies the name of the skewed column, andvalue
specifies the skewed key value. Examples:-- Method 1: Add a hint to specify the alias of the table. select /*+ skewjoin(a) */ * from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1; -- Method 2: Add a hint to specify the name of the table and the names of possibly skewed columns. For example, the following statement shows that Columns c0 and c1 in Table a are skewed columns. select /*+ skewjoin(a(c0, c1)) */ * from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1 and a.c2 = b.c2; -- Method 3: Add a hint to specify the name of table and the names of columns and provide the skewed key values. If skewed key values are of the STRING type, enclose each value with double quotation marks ("). In the following statement, (a.c0=1 and a.c1="2") and (a.c0=3 and a.c1="4") contain skewed key values. select /*+ skewjoin(a(c0, c1)((1, "2"), (3, "4"))) */ * from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1 and a.c2 = b.c2;
NoteIf you use a skew join hint to specify skewed values, the processing efficiency is higher than that of Method 1 and Method 2 (no skewed values specified).
If you use a skew join hint for join operations, take note of the following limits:
INNER JOIN: Skew join hints can be specified for the left or right table in the INNER JOIN statement.
LEFT JOIN, SEMI JOIN, and ANTI JOIN: Skew join hints can be specified only for the left table.
RIGHT JOIN: Skew join hints can be specified only for the right table.
FULL JOIN: Skew join hints are not supported.
An Aggregate is run after a skew join hint is added, which slows down the join operation. Therefore, we recommend that you add a skew join hint only to the JOIN statements that cause data skew.
The data type of Left Side Join Key must be the same as that of Right Side Join Key for the JOIN statement to which a skew join hint is added. If the data types are different, the skew join hint becomes ineffective. In the preceding example, the data types of
a.c0
andb.c0
must be the same and the data types ofa.c1
andb.c1
must be the same. To ensure data type consistency, you can use the CAST function to convert the data types of join keys in subqueries. Sample statements:create table T0(c0 int, c1 int, c2 int, c3 int); create table T1(c0 string, c1 int, c2 int); -- Method 1: select /*+ skewjoin(a) */ * from T0 a join T1 b on cast(a.c0 as string) = cast(b.c0 as string) and a.c1 = b.c1; -- Method 2: select /*+ skewjoin(b) */ * from (select cast(a.c0 as string) as c00 from T0 a) b join T1 c on b.c00 = c.c0;
After a skew join hint is added, the optimizer runs an Aggregate to obtain the first N hot key values. By default, the first
20
hot key values are obtained. You can run theset odps.optimizer.skew.join.topk.num = xx;
command to specify the number of hot key values that the optimizer can obtain.Skew join hints allow you to add a hint only for the left or right table involved in a JOIN statement.
In the JOIN statement to which a skew join hint is added,
left key = right key
must be included. Skew join hints cannot be added to the CARTESIAN JOIN statement.You cannot add a skew join hint to a JOIN statement to which a map join hint is added.
Perform a modulo equal join by using a multiple table.
The logic of this solution is different from that of the other solutions. In this solution, a multiple table that contains only one column of the INT type is used. For example, the column contains values from 1 to N. The specific values are determined based on the skew extent. You can use the multiple table to bloat the user behavior table by N times. The two associated keys user ID and
number
are used for the join operation. This way, the user ID-based data distribution that causes the data skew issue is changed to the1/N
data distribution after the two associated keys user ID andnumber
are used. However, the amount of data is increased by N times in this case.SELECT eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> )t1 LEFT JOIN( SELECT /*+mapjoin(<multipletable>)*/ eleme_uid, number ... FROM <customertable> JOIN <multipletable> ) t2 on t1.eleme_uid = t2.eleme_uid and mod(t1.<value_col>,10)+1 = t2.number;
Based on the preceding data bloat mechanism, you can make the data bloat take effect only for the hot key values in the two tables. Non-hot key values in the tables remain unchanged. Find the hot key values, and separately process the traffic table and user behavior table. Add the
eleme_uid_join
column to the tables. If the user ID is a hot key value, theCONCAT
method is used to return a random positive integer. The integer ranges from 0 to a predefined multiple, such as 0 to 1000. If the returned value is not within the specified range, the original user ID remains unchanged. When you join two tables, theeleme_uid_join
column is used. This way, the multiple of hot key values is bloated, and data skew is mitigated. Unnecessary bloat of non-hot key values is prevented. However, the SQL code based on the original business logic is completely changed. Therefore, we recommend that you do not use this solution.
GROUP BY
Sample pseudocode in which a GROUP BY clause is specified:
SELECT shop_id
,sum(is_open) AS Business days
FROM table_xxx_di
WHERE dt BETWEEN '${bizdate_365}' AND '${bizdate}'
GROUP BY shop_id;
The following table describes the solutions to data skew issues that are caused by the GROUP BY clause.
No. | Solution | Description |
Solution 1 | Configure a parameter to enable the anti-skew feature for the GROUP BY clause. | Add |
Solution 2 | Add a random number. | Split the key that causes a long tail. |
Solution 3 | Create a rolling table. | This solution helps reduce costs and improve efficiency. |
Solution 1: Configure a parameter to enable the anti-skew feature for the GROUP BY clause.
set odps.sql.groupby.skewindata=true;
Solution 2: Add a random number.
Compared with Solution 1, this solution allows you to rewrite the SQL statement by adding a random number. This solution is used to split the key that causes a long tail of the GROUP BY clause.
For the statement
Select Key,Count(*) As Cnt From TableName Group By Key;
, combiners are not required. The MapReduce system shuffles output data from mappers to reducers. Then, reducers perform COUNT operations. The related execution plan isM -> R
.In the following example, the key that causes a long tail is reassigned.
-- The key that causes a long tail is KEY001. SELECT a.Key ,SUM(a.Cnt) AS Cnt FROM(SELECT Key ,COUNT(*) AS Cnt FROM <TableName> GROUP BY Key ,CASE WHEN KEY = 'KEY001' THEN Hash(Random()) % 50 ELSE 0 END ) a GROUP BY a.Key;
After the key is reassigned, the execution plan becomes
M -> R -> R
. One more step is added for the execution plan, but the key that causes a long tail is processed by using two steps. This way, the entire execution duration may be reduced. The resource and time consumption are basically the same as that of Solution 1. However, in actual scenarios, a long tail is typically caused by multiple keys. The costs for identifying the keys and rewriting SQL statements are high. Solution 1 is more cost-efficient.Create a rolling table.
The solution aims to reduce costs and improve efficiency. The core requirement is to retrieve the merchant data for the past year. For online tasks, the data of all partitions from
T-1
toT-365
needs to be read, which causes a waste of resources. If you create a rolling table, the amount of data that is read from partitions is reduced but the retrieval of all data for the past year is not affected.In the following example, merchant business data for 365 days is initialized for the first time. The GROUP BY clause is used to collect data. The date on which data is updated is remarked and data is stored in Table
a
. For subsequent tasks, Tablea
that is used to collect data on dateT-2
is associated with the tabletable_xxx_di
and then a GROUP BY operation is performed. This way, the number of partitions from which data is read each day is reduced from 365 to 2. The number of times the primary keyshopid
is read is significantly reduced, and resource consumption is also reduced.-- Create a rolling table. CREATE TABLE IF NOT EXISTS m_xxx_365_df ( shop_id STRING COMMENT, last_update_ds COMMENT, 365d_open_days COMMENT ) PARTITIONED BY ( ds STRING COMMENT 'Date partition' ) LIFECYCLE 7; -- Initialize the m_xxx_365_df table that stores the data from May 1, 2021 to May 1, 2022. INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '20220501') SELECT shop_id, max(ds) as last_update_ds, sum(is_open) AS 365d_open_days FROM table_xxx_di WHERE dt BETWEEN '20210501' AND '20220501' GROUP BY shop_id; -- Execute the following statement for subsequent tasks: INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '${bizdate}') select aa.shop_id, aa.last_update_ds, 365d_open_days - COALESCE(is_open, 0) as 365d_open_days -- Infinite rolling storage without specifying business days. from ( select shop_id, max(last_update_ds) as last_update_ds, sum(365d_open_days) AS 365d_open_days from ( SELECT shop_id, ds as last_update_ds, sum(is_open) AS 365d_open_days FROM table_xxx_di WHERE ds = '${bizdate}' GROUP BY shop_id union all SELECT shop_id, last_update_ds, 365d_open_days FROM m_xxx_365_df WHERE dt = '${bizdate_2}' and last_update_ds >= '${bizdate_365}' GROUP BY shop_id ) group by shop_id ) as aa left join ( SELECT shop_id, is_open FROM table_xxx_di WHERE ds = '${bizdate_366}' ) as bb on aa.shop_id = bb.shop_id;
COUNT(DISTINCT)
In this example, a table contains the following data:
ds (partition) | cnt (number of records) |
20220416 | 73025514 |
20220415 | 2292806 |
20220417 | 2319160 |
If you use the following statement, a data skew issue is likely to occur.
SELECT ds
,COUNT(DISTINCT shop_id) AS cnt
FROM demo_data0
GROUP BY ds;
The following table describes the solutions that you can use to resolve the data skew issue.
No. | Solution | Description |
Solution 1 | Optimize the parameter settings. | Add |
Solution 2 | Perform two-stage aggregate operations. | Concatenate the value of the partition field with a random number. |
Solution 3 | This solution is similar to Solution 2. | Perform a GROUP BY operation on |
Solution 1: Optimize the parameter settings.
Set the odps.sql.groupby.skewindata parameter to true.
SET odps.sql.groupby.skewindata=true;
Solution 2: Perform two-stage aggregate operations.
If data of the
shop_id
field is not evenly distributed, you cannot use Solution 1 to resolve the data skew issue. You can concatenate the value of the partition field with a random number.-- Method 1: Execute CONCAT(ROUND(RAND(),1)*10,'_', ds) AS rand_ds. SELECT SPLIT_PART(rand_ds, '_',2) ds ,COUNT(*) id_cnt FROM ( SELECT rand_ds ,shop_id FROM demo_data0 GROUP BY rand_ds,shop_id ) GROUP BY SPLIT_PART(rand_ds, '_',2); -- Method 2: Execute ROUND(RAND(),1)*10 AS randint10 to add a random number. SELECT ds ,COUNT(*) id_cnt FROM (SELECT ds ,randint10 ,shop_id FROM demo_data0 GROUP BY ds,randint10,shop_id ) GROUP BY ds;
Solution 3: Implement a solution that is similar to Solution 2.
If data of the fields that are used for GROUP BY and DISTINCT operations is evenly distributed, you can perform a GROUP BY operation on the two group fields ds and shop_id and then run the
COUNT(DISTINCT)
command.SELECT ds ,COUNT(*) AS cnt FROM(SELECT ds ,shop_id FROM demo_data0 GROUP BY ds ,shop_id ) GROUP BY ds;
ROW_NUMBER(TopN)
The following code provides an example on how to return data of top 10 rows.
SELECT main_id
,type
FROM (SELECT main_id
,type
,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
FROM <data_demo2>
) A
WHERE A.rn <= 10;
If a data skew issue is caused by ROW_NUMBER(TopN), you can use one of the following solutions to resolve the issue.
No. | Solution | Description |
Solution 1 | Perform two-stage aggregate operations by using SQL statements. | Add random columns or concatenate a random number as a parameter in the partition. |
Solution 2 | Perform two-stage aggregate operations by using a user-defined aggregate function (UDAF). | Use a UDAF for the queue with the minimum heap. |
Solution 1: Perform two-stage aggregate operations by using SQL statements.
In the Map stage, add a random column as a parameter in the partition to evenly distribute data in each group of partitions
SELECT main_id ,type FROM (SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn FROM (SELECT main_id ,type FROM (SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn FROM (SELECT main_id ,type ,ceil(110 * rand()) % 11 AS src_pt FROM data_demo2 ) ) B WHERE B.rn <= 10 ) ) A WHERE A.rn <= 10; -- Configure a random number. SELECT main_id ,type FROM (SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn FROM (SELECT main_id ,type FROM(SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn FROM (SELECT main_id ,type ,ceil(10 * rand()) AS src_pt FROM data_demo2 ) ) B WHERE B.rn <= 10 ) ) A WHERE A.rn <= 10;
Solution 2: Perform two-stage aggregate operations by using a UDAF.
SQL has a large amount of code and may be difficult to maintain. In this example, the queue with the minimum heap is optimized by using a UDAF. In the
iterate
stage, only top N rows of data are retrieved, and only N elements are merged in themerge
stage. Implementation of the entire process:iterate
: pushes the first K elements, compares the elements after K with the elements in the queue with the minimum heap, and then exchanges the elements in the queue with the minimum heap.merge
: returns the first K elements after the two heaps of elements are merged.terminate
: returns the heap as an array.Split an array into rows by using SQL statements.
@annotate('* -> array<string>') class GetTopN(BaseUDAF): def new_buffer(self): return [[], None] def iterate(self, buffer, order_column_val, k): # heapq.heappush(buffer, order_column_val) # buffer = [heapq.nlargest(k, buffer), k] if not buffer[1]: buffer[1] = k if len(buffer[0]) < k: heapq.heappush(buffer[0], order_column_val) else: heapq.heappushpop(buffer[0], order_column_val) def merge(self, buffer, pbuffer): first_buffer, first_k = buffer second_buffer, second_k = pbuffer k = first_k or second_k merged_heap = first_buffer + second_buffer merged_heap.sort(reverse=True) merged_heap = merged_heap[0: k] if len(merged_heap) > k else merged_heap buffer[0] = merged_heap buffer[1] = k def terminate(self, buffer): return buffer[0] SET odps.sql.python.version=cp37; SELECT main_id,type_val FROM ( SELECT main_id ,get_topn(type, 10) AS type_array FROM data_demo2 GROUP BY main_id ) LATERAL VIEW EXPLODE(type_array)type_ar AS type_val;
Dynamic partitioning
Dynamic partitioning is a syntax that allows you to specify the name of a partition key column in a partition when you insert data into a partitioned table. You do not need to specify the value of the partition key column. You need to only specify the value of the partition key column in a SELECT
clause. Before you execute an SQL statement, the partitions to be generated are non-deterministic. After the SQL statement is completely executed, you can determine the partitions that are generated based on the values of partition key columns. For more information, see Insert or overwrite data into dynamic partitions (DYNAMIC PARTITION). Sample statements:
create table total_revenues (revenue bigint) partitioned by (region string);
insert overwrite table total_revenues partition(region)
select total_price as revenue
,region
from sale_detail;
When you create tables that contain dynamic partitions in different scenarios, data skew issues are likely to occur. When a data skew issue occurs, you can use one of the following solutions to resolve the issue.
No. | Solution | Description |
Solution 1 | Optimize parameter settings. | Optimize parameter settings. |
Solution 2 | Perform partition pruning. | This solution allows you to prune the partitions in which a large number of data records are stored and separately insert data. |
Solution 1: Optimize parameter settings.
Dynamic partitioning allows you to place data that meets different conditions into different partitions to prevent multiple INSERT OVERWRITE operations on a table. This simplifies the code especially when a large number of partitions are available. However, dynamic partitioning may also cause an excessive number of small files to be generated.
Data skew example
In this example, the following SQL statement is used.
insert into table part_test partition(ds) select * from part_test;
K map instances and N destination partitions exist.
ds=1 cfile1 ds=2 ... X ds=3 cfilek ... ds=n
In the most extreme case,
K × N
small files may be generated and are difficult to manage. To address this issue, MaxCompute provides additional level-1 reduce tasks for dynamic partitions. This way, one or a small number of reduce instances are used to write data to the same partition. This prevents an excessive number of small files from being generated and ensures that the last reduce task is used to perform reduce operations. By default, odps.sql.reshuffle.dynamicpt is set to true to enable the feature.set odps.sql.reshuffle.dynamicpt=true;
After the feature is enabled, the number of small files is not excessively large, and reduce tasks can be successfully executed. However, data skew issues may occur. Additional level-1 reduce operations consume computing resources. Therefore, you must exercise caution when you perform this operation.
Solution
The configuration of
set odps.sql.reshuffle.dynamicpt=true;
is used to prevent an excessive number of small files from being generated when additional level-1 reduce tasks are used. If a small number of destination partitions exist, the number of small files that are generated is not large. The configuration may cause a waste of resources and degrades performance. In this case, the configuration ofset odps.sql.reshuffle.dynamicpt=false;
can significantly improve performance. Sample statement:insert overwrite table ads_tb_cornucopia_pool_d partition (ds, lv, tp) select /*+ mapjoin(t2) */ '20150503' as ds, t1.lv as lv, t1.type as tp from (select ... from tbbi.ads_tb_cornucopia_user_d where ds = '20150503' and lv in ('flat', '3rd') and tp = 'T' and pref_cat2_id > 0 ) t1 join (select ... from tbbi.ads_tb_cornucopia_auct_d where ds = '20150503' and tp = 'T' and is_all = 'N' and cat2_id > 0 ) t2 on t1.pref_cat2_id = t2.cat2_id;
If you use the default parameters in the preceding code, the run time of the entire task is about 1 hour and 30 minutes. The run time of the last Reduce task is about 1 hour and 20 minutes, which accounts for about
90%
of the total run time. After an additional Reduce task is introduced, the data distribution of each Reduce instance is particularly uneven. As a result, a long tail occurs.
In the preceding example, the number of generated dynamic partitions that are counted based on the statistical information shows that the number of dynamic partitions generated per day is about 2. Therefore, the configuration of
set odps.sql.reshuffle.dynamicpt=false;
is allowed. This way, the task can be completed in 9 minutes. Therefore, the configuration ofset odps.sql.reshuffle.dynamicpt=false;
can help improve performance, reduce the computing duration, save computing resources, and increase marginal revenues.The configuration of
set odps.sql.reshuffle.dynamicpt=false;
is also suitable for tasks whose execution duration is not long and resource consumption is not high only if a small number of dynamic partitions exist. The configuration can help save resources and improve performance.Solution 1 can be used for nodes if all the following conditions are met, without consideration of task execution durations.
Dynamic partitions are used.
The number of dynamic partitions is less than or equal to 50.
The configuration of set odps.sql.reshuffle.dynamicpt=false; is not used.
You can determine the priority of the node to configure the odps.sql.reshuffle.dynamicpt parameter based on the execution duration of the last Fuxi instance. The
diag_level
field specifies the node priority. You can configure the field based on the following rules:Diag_Level=4 ('Severe')
:Last_Fuxi_Inst_Time
is longer than 30 minutes.Diag_Level=3 ('High')
:Last_Fuxi_Inst_Time
ranges from 20 minutes to 30 minutes.Diag_Level=2 ('Medium')
:Last_Fuxi_Inst_Time
ranges from 10 minutes to 20 minutes.Diag_Level=1 ('Low')
:Last_Fuxi_Inst_Time
is less than 10 minutes.
Solution 2: Perform partition pruning.
If the data skew issue occurs in the Map stage when you insert data into dynamic partitions, you can identify the partitions in which a large number of records are stored, perform partition pruning, and then separately insert data. You can modify the parameter settings of the Map stage based on the actual situation. The following code shows an example.
set odps.sql.mapper.split.size=128; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi;
The returned result shows that a full-table scan is performed in the entire process. You can use the configuration of SET odps.sql.reshuffle.dynamicpt=false; to resolve the data skew issue. Sample statements:
SET odps.sql.reshuffle.dynamicpt=false ; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi;
If the data skew issue occurs in the Map stage when you insert data into dynamic partitions, you can identify the partitions in which a large number of records are stored, perform partition pruning, and then separately insert data. The following steps show an example.
Execute the following statement to query the partitions in which a large number of records are stored:
SELECT ds ,hh ,COUNT(*) AS cnt FROM dwd_alsc_ent_shop_info_hi GROUP BY ds ,hh ORDER BY cnt DESC;
The following table lists the data of some partitions.
ds
hh
cnt
20200928
17
1052800
20191017
17
1041234
20210928
17
1034332
20190328
17
1000321
20210504
1
19
20191003
20
18
20200522
1
18
20220504
1
18
Run the SET command with the following SQL statements to prune the partitions that you queried and separately insert data into each partition:
set odps.sql.reshuffle.dynamicpt=false ; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi WHERE CONCAT(ds,hh) NOT IN ('2020092817','2019101717','2021092817','2019032817'); set odps.sql.reshuffle.dynamicpt=false ; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi WHERE CONCAT(ds,hh) IN ('2020092817','2019101717','2021092817','2019032817'); SELECT ds ,hh,COUNT(*) as cnt FROM dwd_alsc_ent_shop_info_hi GROUP BY ds,hh ORDER BY cnt desc;