全部产品
Search
文档中心

云原生大数据计算服务 MaxCompute:DISTRIBUTED MAPJOIN

更新时间:May 20, 2024

Distributed MapJoin是MapJoin的升级版,适用于大表Join中表的场景,二者的核心目的都是为了减少大表侧的Shuffle和排序。

注意事项

  • Join两侧的表数据量要求不同,大表侧数据在10 TB以上,中表侧数据在[1 GB, 100 GB]范围内。

  • 小表侧的数据需要均匀分布,没有明显的长尾,否则单个分片会产生过多的数据,导致OOM(Out Of Memory)及RPC(Remote Procedure Call)超时问题。

  • SQL任务运行时间在20分钟以上,建议使用Distributed MapJoin进行优化。

  • 由于在执行任务时,需要占用较多的资源,请避免在较小的Quota组运行。

    说明

    您可以在配额(Quota)管理页面,修改配额组,详情请参见Quota管理(新版)

使用方法

您需要在select语句中使用Hint提示/*+distmapjoin(<table_name>(shard_count=<n>,replica_count=<m>))*/才会执行distmapjoinshard_countreplica_count共同决定任务运行的并发度,即并发度=shard_count * replica_count

  • 参数说明

    • table_name:目标表名。

    • shard_count=<n>:设置小表数据的分片数,小表数据分片会分布至各个计算节点处理。n即为分片数,一般按奇数设置。

      说明
      • shard_count值建议手动指定,shard_count值可以根据小表数据量来大致估算,预估一个分片节点处理的数据量范围是[200 MB, 500 MB]。

      • shard_count设置过大,性能和稳定性会受影响;shard_count设置过小,会因内存使用过多而报错。

    • replica_count=<m>:设置小表数据的副本数。m即为副本数,默认为1。

      说明

      为了减少访问压力以及避免单个节点失效导致整个任务失败,同一个分片的数据,可以有多个副本。当并发过多,或者环境不稳定导致运行节点频繁重启,可以适当提高replica_count,一般建议为2或3。

  • 语法示例

    -- 推荐,指定shard_count(replica_count默认为1)
    /*+distmapjoin(a(shard_count=5))*/
    
    -- 推荐,指定shard_count和replica_count
    /*+distmapjoin(a(shard_count=5,replica_count=2))*/
    
    -- distmapjoin多个小表
    /*+distmapjoin(a(shard_count=5,replica_count=2),b(shard_count=5,replica_count=2)) */
    
    -- distmapjoin和mapjoin混用
    /*+distmapjoin(a(shard_count=5,replica_count=2)),mapjoin(b)*/

使用示例

为了便于理解,本文以向分区表tmall_dump_lasttable插入数据为例,为您演示Distributed MapJoin的用法。

  • 常规写法。

    insert OVERWRITE table tmall_dump_lasttable partition(ds='20211130')
    select t1.*
    from
    (
        select nid, doc,type
        from search_ods.dump_lasttable where ds='20211203'
    )t1
    join
    (
        select distinct item_id
        from tbcdm.dim_tb_itm
        where ds='20211130'
        and bc_type='B'
        and is_online='Y'
    )t2
    on t1.nid=t2.item_id;
  • 优化后写法。

    insert OVERWRITE table tmall_dump_lasttable partition (ds='20211130')
    select /*+ distmapjoin(t2(shard_count=35)) */ t1.*
    from
    (
        select nid, doc, type
        from search_ods.dump_lasttable where ds='20211203'
    )t1
    join
    (
        select distinct item_id
        from tbcdm.dim_tb_itm
        where ds='20211130'
        and bc_type='B'
        and is_online='Y'
    )t2
    on t1.nid=t2.item_id;