全部產品
Search
文件中心

MaxCompute:DISTRIBUTED MAPJOIN

更新時間:Jun 19, 2024

Distributed MapJoin是MapJoin的升級版,適用於大表Join中表的情境,二者的核心目的都是為了減少大表側的Shuffle和排序。

注意事項

  • Join兩側的表資料量要求不同,大表側資料在10 TB以上,中表側資料在[1 GB, 100 GB]範圍內。

  • 小表側的資料需要均勻分布,沒有明顯的長尾,否則單個分區會產生過多的資料,導致OOM(Out Of Memory)及RPC(Remote Procedure Call)逾時問題。

  • SQL任務已耗用時間在20分鐘以上,建議使用Distributed MapJoin進行最佳化。

  • 由於在執行任務時,需要佔用較多的資源,請避免在較小的Quota組運行。

    說明

    您可以在配額(Quota)管理頁面,修改配額組,詳情請參見Quota管理(新版)

使用方法

您需要在select語句中使用Hint提示/*+distmapjoin(<table_name>(shard_count=<n>,replica_count=<m>))*/才會執行distmapjoinshard_countreplica_count共同決定任務啟動並執行並發度,即並發度=shard_count * replica_count

  • 參數說明

    • table_name:目標表名。

    • shard_count=<n>:設定小表資料的分區數,小表資料分區會分布至各個計算節點處理。n即為分區數,一般按奇數設定。

      說明
      • shard_count值建議手動指定,shard_count值可以根據小表資料量來大致估算,預估一個分區節點處理的資料量範圍是[200 MB, 500 MB]。

      • shard_count設定過大,效能和穩定性會受影響;shard_count設定過小,會因記憶體使用量過多而報錯。

    • replica_count=<m>:設定小表資料的副本數。m即為副本數,預設為1。

      說明

      為了減少訪問壓力以及避免單個節點失效導致整個任務失敗,同一個分區的資料,可以有多個副本。當並發過多,或者環境不穩定導致運行節點頻繁重啟,可以適當提高replica_count,一般建議為2或3。

  • 文法樣本

    -- 推薦,指定shard_count(replica_count預設為1)
    /*+distmapjoin(a(shard_count=5))*/
    
    -- 推薦,指定shard_count和replica_count
    /*+distmapjoin(a(shard_count=5,replica_count=2))*/
    
    -- distmapjoin多個小表
    /*+distmapjoin(a(shard_count=5,replica_count=2),b(shard_count=5,replica_count=2)) */
    
    -- distmapjoin和mapjoin混用
    /*+distmapjoin(a(shard_count=5,replica_count=2)),mapjoin(b)*/

使用樣本

為了便於理解,本文以向分區表tmall_dump_lasttable插入資料為例,為您示範Distributed MapJoin的用法。

  • 常規寫法。

    insert OVERWRITE table tmall_dump_lasttable partition(ds='20211130')
    select t1.*
    from
    (
        select nid, doc,type
        from search_ods.dump_lasttable where ds='20211203'
    )t1
    join
    (
        select distinct item_id
        from tbcdm.dim_tb_itm
        where ds='20211130'
        and bc_type='B'
        and is_online='Y'
    )t2
    on t1.nid=t2.item_id;
  • 最佳化後寫法。

    insert OVERWRITE table tmall_dump_lasttable partition (ds='20211130')
    select /*+ distmapjoin(t2(shard_count=35)) */ t1.*
    from
    (
        select nid, doc, type
        from search_ods.dump_lasttable where ds='20211203'
    )t1
    join
    (
        select distinct item_id
        from tbcdm.dim_tb_itm
        where ds='20211130'
        and bc_type='B'
        and is_online='Y'
    )t2
    on t1.nid=t2.item_id;