本ページでは、JOIN ステートメントが実行される際のデータスキューとソリューションについて説明します。
背景
MaxCompute SQL の JOIN 文が実行されると、同じ JOIN キーを持つデータが同じインスタンスに送信され、同じインスタンスで処理されます。 キーに大量のデータが含まれる場合、処理を実行するインスタンスは他のインスタンスよりもデータの処理に時間がかかります。 実行ログでは、この JOIN タスクのいくつかのインスタンスは実行状態のままですが、他のインスタンスは完了状態になります。 この状態はロングテールと呼ばれます。
データスキューに起因するロングテールは非常に一般的であり、タスクの実行期間を大幅に延長します。 「独身の日」などのプロモーションイベント中には、深刻なロングテールが発生する場合があります。 たとえば、大規模販売者のページビューは、小規模販売者のページビューよりもはるかに多くなります。 ページビューのログデータが販売者ディメンションテーブルに関連付けられている場合、データは販売者 ID によって割り当てられます。 これにより、一部のインスタンスが他のインスタンスよりもはるかに多くのデータを処理します。 このような場合はロングテールが発生するため、タスクを完了できません。
- 1 つの大きなテーブルと 1 つの小さなテーブルがある場合、MAP JOIN 文を実行して小さなテーブルをキャッシュできます。 MAP JOIN 文の詳細については、「SELECT 構文の説明」をご参照ください。
- 2 つの大きなテーブルがある場合は、最初にデータ重複排除を実行します。
- 2 つの大きなキーのデカルト積の原因を特定し、業務の観点からこれらのキーを最適化してください。
- 小さなサイズと大きなサイズのテーブルに対して LEFT JOIN 文を直接実行すると、長い時間がかかります。 この場合、小さいテーブルと大きいテーブルに対して MAP JOIN 文を実行して、2 つのテーブルの共通部分を含む中間テーブルを生成することを推奨します。 この中間テーブルは、大きなテーブルのサイズより大きくなりません。 キーの歪度は、展開されたテーブルのサイズに比例します。 次に、小さいテーブルと中間テーブルに対して LEFT JOIN 文を実行します。 この結果は、小さいテーブルと大きいテーブルに対して LEFT JOIN 文を実行した結果よりも小さくなります。
JOIN 文の実行時にデータスキューが発生しているかどうかを確認する方法
- SQL 文の実行時に生成された Logview ログファイルを開き、各 Fuxi タスクの実行の詳細を確認します。 Long-Tails(115) は、115 のロングテールがあることを示しています。
- Fuxi インスタンスの後のアイコンをクリックして、stdout のインスタンスで読み取ったデータを確認します。
例えば、
Read from 0 num:52743413 size:1389941257
は、JOIN 文の実行時に 1,389,941,257 行のデータが読み取られていることを示しています。 [ロングテール] にリストされているインスタンスが他のインスタンスよりもはるかに多くのデータを読み取る場合、大量のデータが原因でロングテールが起こっていることを示しています。
一般的な原因とソリューション
- MAP JOIN ソリューション:JOIN 文の実行に小さなテーブルが含まれる際にデータスキューが発生した場合、MAP JOIN 文を実行してロングテールを防ぐことができます。
MAP JOIN ソリューションは JOIN 文が Map 側で実行されるよう機能します。 これにより、不均衡なキー分散によるデータスキューが防止されます。 MAP JOIN 文には、次の制限があります。
- MAP JOIN 文は、セカンダリテーブルが小さい場合にのみ適用できます。 セカンダリテーブルは、LEFT OUTER JOIN 文が実行されると右側のテーブルを参照し、RIGHT OUTER JOIN 文が実行されると左側のテーブルを参照します。
- MAP JOIN 文を実行する際は、小さなテーブルのサイズも制限されます。 デフォルトでは、小さなテーブルがメモリに読み込まれた後の最大サイズは 512 MB です。
次の文を実行して、最大サイズを 2,048 MB に拡張できます。
set odps.sql.mapjoin.memory.max=2048
MAP JOIN 文は簡単に使用できます。SELECT
文の後に、/*+mapjoin(b)*/
を加えます。ここで、b は小さなテーブルまたはサブクエリのエイリアスを示しています。 例:select /*+mapjoin(b)*/ a.c2 ,b.c3 from (select c1 ,c2 from t1 ) a left outer join (select c1 ,c3 from t2 ) b on a.c1 = b.c1;
- 空値が原因となる JOIN のロングテール
蓄積された空値がロングテールを引き起こし、小さなテーブルが含まれていないため MAP JOIN 文を使用できない場合、これらの空値はランダムな値として処理されます。 空値は関連付けることができないため、1 つのインスタンスに送信されます。 ランダムな値を関連付けることができるため、値の蓄積を防ぎます。
select ... from (select * from tbcdm.dim_tb_itm where ds='${bizdate}' )son1 left outer join (select * from tbods.s_standard_brand where ds='${bizdate}' and status=3 )son2 on coalesce(son1.org_brand_id,rand()*9999)=son2.value_id;
ON
句にcoalesce(son1.org_brand_id,rand()*9999)
が含まれている場合、空のorg_brand_id
がランダムな値に置き換えられることを示しています。 空値が蓄積されることによるロングテールが防止されます。 - ホットキー値が原因となる JOIN のロングテール
ホットキーの値によってロングテールが発生している場合で、小さなテーブルが存在しないために MAP JOIN 文を使用できない場合は、ホットキーを抽出します。 プライマリテーブルのホットキーデータは、他のデータから分離され、独立して処理された後、他のデータと結合されます。 次の例では、Taobao Web サイトのページビューログテーブルが商品ディメンションテーブルに関連付けられています。
- ホットキーのデータの抽出:ページビューが 50,000 を超える商品の ID を一時テーブルに抽出します。
insert overwrite table topk_item select item_id from (select item_id ,count(1) as cnt from dwd_tb_log_pv_di where ds = '${bizdate}' and url_type = 'ipv' and item_id is not null group by item_id ) a where cnt >= 50000;
- 非ホットキーのデータ抽出
OUTER JOIN 文を実行して、プライマリテーブル sdwd_tb_log_pv_di をホットキーテーブル topk_item に関連付けます。さらに、条件
b1.item_id is null
を適用することで、関連付けのできない非ホット商品のログデータを抽出します。 MAP JOIN 文を実行します。 次に、非ホットキーテーブルを商品ディメンションテーブルに関連付けます。 ホットキーデータが削除され、ロングテールは発生しません。select ... from (select * from dim_tb_itm where ds = '${bizdate}' ) a right outer join (select /*+mapjoin(b1)*/ b2.* from (select item_id from topk_item where ds = '${bizdate}' ) b1 right outer join (select * from dwd_tb_log_pv_di where ds = '${bizdate}' and url_type = 'ipv' ) b2 on b1.item_id = coalesce(b2.item_id,concat("tbcdm",rand()) where b1.item_id is null ) l on a.item_id = coalesce(l.item_id,concat("tbcdm",rand());
- ホットキーのデータ抽出
INNER JOIN 文を実行して、プライマリテーブル sdwd_tb_log_pv_di をホットキーテーブル topk_item に関連付けます。 MAP JOIN 文を実行して、ホット商品のログデータを取得します。 INNER JOIN 文を実行して、商品ディメンションテーブル dim_tb_itm をホットキーテーブル topk_item に関連付け、ホット商品ディメンションテーブルのデータを取得します。 OUTER JOIN 文を実行して、2 つのデータ部分を関連付けます。
select /*+mapjoin(a)*/ ... from (select /*+mapjoin(b1)*/ b2.* from (select item_id from topk_item where ds = '${bizdate}' )b1 join (select * from dwd_tb_log_pv_di where ds = '${bizdate}' and url_type = 'ipv' and item_id is not null ) b2 on (b1.item_id = b2.item_id) ) l left outer join (select /*+mapjoin(a1)*/ a2.* from (select item_id from topk_item where ds = '${bizdate}' ) a1 join (select * from dim_tb_itm where ds = '${bizdate}' ) a2 on (a1.item_id = a2.item_id) ) a on a.item_id = l.item_id;
UNION ALL
文を実行し、ログ情報全体を生成するためのステップ b および c で取得したデータを、関連商品情報に結合します。
- ホットキーのデータの抽出:ページビューが 50,000 を超える商品の ID を一時テーブルに抽出します。
- odps.sql.skewjoin パラメーターを設定してロングテールを解決する
この方法は、シンプルな解決策です。 ただし、偏ったキー値が変わった場合、コードを変更して再度実行する必要があります。 また、値の変化は予測できません。 偏ったキー値が多数存在する場合、設定作業に手間がかかります。 そのため、必要に応じてコードまたはパラメーターの設定を分割できます。 odps.sql.skewjoin パラメーターを設定するには、次の手順を実行します。
- odps.sql.skewjoin フラグを有効にします。
set odps.sql.skewjoin=true
- 偏ったキーとその値を設定します。
skewed_key は偏った列を示し、skewed_value はその値を示しています。set odps.sql.skewinfo=skewed_src:(skewed_key) [("skewed_value")]
- odps.sql.skewjoin フラグを有効にします。