MaxCompute では、SQL ランタイム実行モードを使用するように MapReduce ジョブを構成できます。SQL ランタイムに基づいて、SQL エンジンのさまざまな新機能を MapReduce ジョブに適用できます。このトピックでは、SQL ランタイム実行モードを使用するように MapReduce ジョブを構成する方法について説明します。
背景情報
MaxCompute は MapReduce API を提供します。MapReduce によって提供される Java API を使用して、MaxCompute のデータを処理する MapReduce プログラムを作成できます。
MaxCompute の新しいバージョンでは、SQL ランタイム実行モードを使用するように MapReduce ジョブを構成できます。SQL ランタイムに基づいて、MaxCompute SQL エンジンコンパイラ、コストベースオプティマイザ (CBO)、およびベクトル化実行エンジンを使用して MapReduce ジョブを処理できます。SQL エンジンのさまざまな新機能を MapReduce ジョブに適用することもできます。これは、MapReduce ジョブの機能、パフォーマンス、および安定性の向上に役立ちます。
MapReduce ジョブが SQL ランタイム実行モードを使用するように構成されると、ジョブは MaxCompute の SQL エンジンの新機能を使用できます。元の MapReduce ジョブと比較して、SQL ランタイム実行モードの MapReduce ジョブは次の機能をサポートしています。
ビューを入力として使用します。
外部テーブルを入力として使用します。
分散ファイルシステムでの読み取りおよび書き込み操作をサポートします。
ハッシュクラスタテーブルまたは範囲クラスタテーブルでの読み取りおよび書き込み操作をサポートします。
SQL ランタイム実行モードの MapReduce ジョブは、次の機能もサポートしています。
SQL の CBO オプティマイザとベクトル化実行エンジンを使用してパフォーマンスを継続的に向上させます。
新しいストレージフォーマット圧縮メカニズムをサポートします。
ハッシュクラスタテーブルなどの超大規模入力テーブルで JOIN 操作が実行されるシナリオでのパフォーマンスを向上させるために、並列処理の動的調整をサポートします。
多数のジョブとストレステストに基づいて検証された SQL エンジンの安定性の利点を得ます。 フェールオーバーや永続ボリューム要求 (PVC) などのメカニズムが確保されます。
MaxCompute Studio および LogView によって提供される詳細な実行情報、実行計画、コンパイル情報、およびジョブ構成に基づいて、各ステージのタスクの入出力と全体的な手順に関する情報を取得できます。この情報は、効率的な方法で問題を特定し、タスクを最適化し、開発と運用効率を向上させるのに役立ちます。
注意事項
MapReduce ジョブを SQL ランタイム実行モードで実行できるようにするには、ジョブの実行モードのみを構成する必要があります。元のインターフェイスまたはジョブログジックを変更する必要はありません。
MapReduce API を使用してコンパイルされた MapReduce ジョブのみが、SQL ランタイム実行モードで実行できます。MapReduce API の詳細については、「概要」をご参照ください。
SQL ランタイム実行モードで MapReduce ジョブを実行する場合、MapReduce ジョブの料金は MapReduce 課金ルールに基づいて課金されます。詳細については、「MapReduce ジョブの従量課金」をご参照ください。
使用方法
MapReduce タスクの実行モードを構成します。
odps.mr.run.modeパラメーターを構成して、MapReduce タスクの実行モードを指定できます。有効な値:lot: MapReduce エンジンを使用してタスクを実行します。これはデフォルト値です。sql: SQL エンジンを使用してタスクを実行します。実行に失敗した場合、エラーが返されます。hybrid: SQL エンジンを優先的に使用して MapReduce タスクを実行します。実行に失敗した場合、MapReduce エンジンを使用して MapReduce タスクを実行します。
プロジェクトレベルまたはセッションレベルで実行モードを構成できます。
プロジェクトレベルで実行モードを構成する
プロジェクト内のすべての MapReduce ジョブの実行モードを構成するには、プロジェクトの管理者が次のコマンドを実行する必要があります。
setproject odps.mr.run.mode=<lot/sql/hybrid>;セッションレベルで実行モードを構成する
現在の MapReduce ジョブの実行モードを構成するには、次のいずれかの方法を使用します。
JAR 文の前に
set odps.mr.run.mode=<lot/sql/hybrid>文を追加します。ジョブコードで
jobパラメーターを構成します。サンプルコード:JobConf job = new JobConf(); // 実行モードを hybrid に設定します。 job.set("odps.mr.run.mode","hybrid")
説明StreamJob または SecondarySort が使用されるシナリオでは、次の構成のいずれかを追加する必要があります。
StreamJob:
set odps.mr.sql.stream.enable=true;SecondarySort:
set odps.mr.sql.group.enable=true;
ジョブの詳細を表示します。
LogView または MaxCompute Studio を使用して、クライアントで SQL ランタイム実行モードの MapReduce ジョブによって生成された SQL 式を表示し、ジョブの実行の詳細を表示できます。 LogView の使用方法の詳細については、「LogView V2.0 を使用してジョブ情報を表示する」をご参照ください。
LogView XML
[SourceXML] タブで LogView を開き、クライアントから送信された XML 情報を表示します。 MapReduce ジョブと同じ意味を持つ SQL 式が表示されます。例:
create temporary function mr2sql_mapper_152955927079392291755 as 'com.aliyun.odps.mapred.bridge.LotMapperUDTF' using ; create temporary function mr2sql_reducer_152955927079392291755 as 'com.aliyun.odps.mapred.bridge.LotReducerUDTF' using ; @sub_query_mapper := SELECT k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code FROM( SELECT mr2sql_mapper_152955927079392291755(id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code ) as (k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code) FROM ae_antispam.product_sku_tt_inc WHERE ds = "20180615" AND hh = "21" UNION ALL SELECT mr2sql_mapper_152955927079392291755(id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code ) as (k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code) FROM ae_antispam.product_sku ) open_mr_alias1 DISTRIBUTE BY k_id SORT BY k_id ASC; @sub_query_reducer := SELECT mr2sql_reducer_152955927079392291755(k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code) as (id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code) FROM @sub_query_mapper; FROM @sub_query_reducer INSERT OVERWRITE TABLE ae_antispam.product_sku SELECT id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code ;LogView サマリー
LogView ページの [サマリー] タブで、SQL ランタイムの
実行エンジンが MapReduce ジョブの実行に使用されていることを確認できます。例:説明MapReduce ジョブが SQL ランタイム実行モードで実行されていない場合、実行エンジンの情報は表示されません。 SQL ランタイム実行モードで実行されていない MaxCompute の 拡張 MapReduce (MR2) ジョブの場合、実行エンジンは
cganjiangです。Job run mode: fuxi job Job run engine: execution engineLogview JSONSummary
MapReduce の [Json Summary] タブには、Map と Reduce に関する入出力情報のみが含まれています。[Json Summary] タブには、すべての実行パラメーター、論理プラン、物理プラン、および実行の詳細を含む、SQL 実行の各フェーズの詳細が含まれています。例:
"midlots" : [ "LogicalTableSink(table=[[odps_flighting.flt_20180621104445_step1_ad_quality_tech_qp_algo_antifake_wordbag_filter_bag_change_result_lv2_20, auctionid,word,match_word(3) {0, 1, 2}]]) OdpsLogicalProject(auctionid=[$0], word=[$1], match_word=[$2]) OdpsLogicalProject(auctionid=[$0], word=[$1], match_word=[$2]) OdpsLogicalProject(auctionid=[$0], word=[$1], match_word=[$2]) OdpsLogicalProject(auctionid=[$2], word=[$3], match_word=[$4]) OdpsLogicalTableFunctionScan(invocation=[[MR2SQL_MAPPER_152955294118813063732($0, $1)]()], rowType=[RecordType(VARCHAR(2147483647) item_id, VARCHAR(2147483647) text, VARCHAR(2147483647) __tf_0_0, VARCHAR(2147483647) __tf_0_1, VARCHAR(2147483647) __tf_0_2)]) OdpsLogicalTableScan(table=[[ad_quality_tech.qp_algo_antifake_wordbag_filter_bag_change_lv2_20, item_id,text(2) {0, 1}]]) ]