All Products
Search
Document Center

MaxCompute:DISTRIBUTED MAPJOIN

Last Updated:May 20, 2024

DISTRIBUTED MAPJOIN is an optimized version of MAPJOIN. You can use DISTRIBUTED MAPJOIN when you join a small table with a large table. You can use DISTRIBUTED MAPJOIN or MAPJOIN to reduce shuffling and sorting on the large table.

Usage notes

  • The sizes of the tables that you want to join are different. The size of the large table must be greater than 10 TB, and the size of the small table must be within the range of [1 GB, 100 GB].

  • Data in the small table must be evenly distributed. If a small table contains long tails, excessive data is generated in a single shard of the table. As a result, an out of memory (OOM) error or the remote procedure call (RPC) timeout issue may occur.

  • If an SQL task runs for more than 20 minutes, we recommend that you use DISTRIBUTED MAPJOIN for optimization.

  • Excessive resources are occupied when a task is running. Therefore, we recommend that you do not run a task in a small quota group.

    Note

    You can change the quota group on the Quotas page. For more information, see Manage quotas in the new MaxCompute console.

Use DISTRIBUTED MAPJOIN

To use DISTRIBUTED MAPJOIN in a SELECT statement, you must add the hint /*+distmapjoin(<table_name>(shard_count=<n>,replica_count=<m>))*/ to the statement. Both the shard_count and replica_count parameters are used to determine the parallelism of tasks. You can use the following formula to calculate the parallelism: Parallelism = shard_count × replica_count.

  • Parameters

    • table_name: the name of the small table that you want to join.

    • shard_count=<n>: the number of data shards of the small table that you want to join. The data shards of the small table are distributed on each compute node for data processing. n specifies the number of shards. In most cases, this parameter is set to an odd number.

      Note
      • We recommend that you manually specify the shard_count parameter. You can estimate the value of the shard_count parameter based on the size of the small table. The estimated data amount that is processed by a single shard node is within the range of [200 MB, 500 MB].

      • If you set the shard_count parameter to an excessively large value, the data processing performance and stability are adversely affected. If you set the shard_count parameter to an excessively small value, an error may occur due to the excessive use of memory resources.

    • replica_count=<m>: the number of replicas of the small table. m specifies the number of replicas. Default value: 1.

      Note

      To reduce excessive access requests and prevent the failure of an entire task caused by the failure of a single node, you can create multiple replicas of data in the same shard. If a node frequently restarts due to the high parallelism of tasks or unstable environment performance, you can increase the value of the replica_count parameter. We recommend that you set this parameter to 2 or 3.

  • Syntax

    -- Recommended. Specify the shard_count parameter and retain the default value 1 for the replica_count parameter.
    /*+distmapjoin(a(shard_count=5))*/
    
    -- Recommended. Specify the shard_count and replica_count parameters.
    /*+distmapjoin(a(shard_count=5,replica_count=2))*/
    
    -- Use DISTRIBUTED MAPJOIN to join multiple small tables.
    /*+distmapjoin(a(shard_count=5,replica_count=2),b(shard_count=5,replica_count=2)) */
    
    -- Use DISTRIBUTED MAPJOIN and MAPJOIN together.
    /*+distmapjoin(a(shard_count=5,replica_count=2)),mapjoin(b)*/

Examples

This example describes how to use DISTRIBUTED MAPJOIN when you insert data into a partitioned table named tmall_dump_lasttable.

  • JOIN syntax

    insert OVERWRITE table tmall_dump_lasttable partition(ds='20211130')
    select t1.*
    from
    (
        select nid, doc,type
        from search_ods.dump_lasttable where ds='20211203'
    )t1
    join
    (
        select distinct item_id
        from tbcdm.dim_tb_itm
        where ds='20211130'
        and bc_type='B'
        and is_online='Y'
    )t2
    on t1.nid=t2.item_id;
  • DISTRIBUTED MAPJOIN syntax

    insert OVERWRITE table tmall_dump_lasttable partition (ds='20211130')
    select /*+ distmapjoin(t2(shard_count=35)) */ t1.*
    from
    (
        select nid, doc, type
        from search_ods.dump_lasttable where ds='20211203'
    )t1
    join
    (
        select distinct item_id
        from tbcdm.dim_tb_itm
        where ds='20211130'
        and bc_type='B'
        and is_online='Y'
    )t2
    on t1.nid=t2.item_id;