すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:MaxCompute Sparkタスクの開発

最終更新日:Mar 25, 2025

MaxComputeタスクでSparkをローカルモードまたはクラスターモードで実行できます。 また、DataWorksのクラスターモードでMaxComputeタスクでオフラインSparkを実行し、タスクを他のタイプのノードと統合してスケジューリングすることもできます。 このトピックでは、DataWorksでSpark on MaxComputeタスクを設定およびスケジュールする方法について説明します。

前提条件

ODPS Sparkノードが作成されます。 詳細については、「ODPSノードの作成と管理」をご参照ください。

制限事項

Spark 3.Xバージョンを使用するODPS Sparkノードをコミットしたときにエラーが報告された場合、 チケットを起票

し、テクニカルサポートに連絡して、ノードの実行に使用されるスケジューリング専用リソースグループのバージョンを更新します。

背景情報

Spark on MaxComputeは、MaxComputeが提供するコンピューティングサービスで、オープンソースのSparkと互換性があります。 Spark on MaxComputeは、統合コンピューティングリソースとデータセット権限システムに基づくSparkコンピューティングフレームワークを提供します。 MaxComputeのSparkでは、好みの開発方法を使用してSparkタスクを送信および実行できます。 MaxComputeのSparkは、さまざまなデータ処理および分析要件を満たすことができます。 DataWorksでは、ODPS Sparkノードを使用して、MaxComputeタスクでSparkをスケジュールおよび実行し、MaxComputeタスクでSparkを他のタイプのタスクと統合できます。

MaxComputeのSparkを使用すると、Java、Scala、またはPythonを使用してタスクを開発し、ローカルモードまたはクラスターモードでタスクを実行できます。 Spark on MaxComputeでは、DataWorksのクラスターモードでMaxComputeタスクでSparkをオフラインで実行することもできます。 MaxComputeタスクでのSparkの実行モードの詳細については、「実行モード」をご参照ください。

準備

ODPS Sparkノードを使用すると、Java、Scala、またはPythonを使用して、MaxComputeタスクでオフラインSparkを開発および実行できます。 オフラインSpark on MaxComputeタスクの開発に必要な操作とパラメーターは、使用するプログラミング言語によって異なります。 ビジネス要件に基づいてプログラミング言語を選択できます。

Java/Scala

ODPS SparkノードでJavaまたはScalaコードを実行する前に、オンプレミスのマシンでSpark on MaxComputeタスクのコードの開発を完了し、そのコードをMaxComputeリソースとしてDataWorksにアップロードする必要があります。 次の手順を実行する必要があります。

  1. 開発環境を準備します。

    使用するオペレーティングシステムに基づいて、Spark on MaxComputeタスクを実行する開発環境を準備する必要があります。 詳細については、「Linux開発環境の設定」または「Windows開発環境の設定」をご参照ください。

  2. JavaまたはScalaコードを開発します。

    ODPS SparkノードでJavaまたはScalaコードを実行する前に、オンプレミスのマシンまたは準備された開発環境で、Spark on MaxComputeタスクのコードの開発を完了する必要があります。 MaxComputeでSparkが提供するサンプルプロジェクトテンプレートを使用することを推奨します。

  3. 開発したコードをパッケージ化し、DataWorksにアップロードします。

    コードが開発されたら、コードをパッケージ化し、パッケージをMaxComputeリソースとしてDataWorksにアップロードする必要があります。 詳細については、「MaxComputeリソースの作成と使用」をご参照ください。

プログラミング言語: Python (デフォルトのPython環境を使用)

DataWorksでは、DataWorksでPythonリソースにオンラインでコードを記述し、ODPS Sparkノードを使用してコードをコミットして実行することで、PySparkタスクを開発できます。 DataWorksでPythonリソースを作成する方法と、PySparkを使用してMaxComputeアプリケーションでSparkを開発するための例については、「MaxComputeリソースの作成と使用」および「PySparkを使用してMaxComputeアプリケーションでSparkを開発する」をご参照ください。

説明

DataWorksが提供するデフォルトのPython環境を使用して、コードを開発できます。 デフォルトのPython環境でサポートされているサードパーティのパッケージがPySparkタスクの要件を満たせない場合は、プログラミング言語: Python (カスタムPython環境を使用) を参照して、カスタムPython環境を準備できます。 PyODPS 2ノードまたはPyODPS 3ノードを使用することもできます。これらは、開発のためにより多くのPythonリソースをサポートします。

プログラミング言語: Python (カスタムPython環境を使用する)

デフォルトのPython環境がビジネス要件を満たせない場合は、次の手順を実行してカスタムPython環境を準備し、Spark on MaxComputeタスクを実行できます。

  1. オンプレミスのマシンでPython環境を準備します。

    ビジネス要件に基づいてPython環境を構成するには、PySpark Pythonバージョンとサポートされている依存関係を参照してください。

  2. Python環境のコードをパッケージ化し、パッケージをDataWorksにアップロードします。

    Python環境のコードをZIP形式でパッケージ化し、そのパッケージをMaxComputeリソースとしてDataWorksにアップロードする必要があります。 これにより、環境でSpark on MaxComputeタスクを実行できます。 詳細については、「MaxComputeリソースの作成と使用」をご参照ください。

说明のパラメータ

DataWorksのクラスターモードで、MaxComputeタスクでオフラインSparkを実行できます。 このモードでは、カスタムアプリケーションのエントリポイントとしてMainメソッドを指定する必要があります。 MainSuccessまたはFail状態になると、Sparkタスクは終了します。 spark-defaults.confファイルの設定項目をODPS Sparkノードの設定に追加する必要があります。 たとえば、executorsの数、メモリサイズ、spark.hadoop.odps.ru ntime.end.pointなどの設定項目を追加する必要があります。

説明

spark-defaults.confファイルをアップロードする必要はありません。 代わりに、spark-defaults.confファイルの設定項目をODPS Sparkノードの設定に1つずつ追加する必要があります。

Spark任务配置

パラメーター

説明

spark-submitコマンド

Sparkバージョン

Sparkのバージョン。 有効な値: Spark1.xSpark2.x、およびSpark3.x

説明

Spark 3.Xバージョンを使用するODPS Sparkノードをコミットしたときにエラーが報告された場合、 チケットを起票

し、テクニカルサポートに連絡して、ノードの実行に使用されるスケジューリング専用リソースグループのバージョンを更新します。

なし

Language

プログラミング言語。 有効な値: Java/ScalaおよびPython。 ビジネス要件に基づいてプログラミング言語を選択できます。

なし

主なJARリソース

メインのJARまたはPythonリソースファイル。

必要なリソースファイルをDataWorksにアップロードし、事前にリソースファイルをコミットする必要があります。 詳細については、「MaxComputeリソースの作成と使用」をご参照ください。

アプリjarまたはPythonファイル

構成アイテム

Spark on MaxComputeタスクを送信するために必要な設定項目。

  • spark.hadoop.odps.access.idspark.hadoop.odps.access.key、またはspark.hadoop.odps.end.pointを設定する必要はありません。 デフォルトでは、これらの設定項目の値はMaxComputeプロジェクトの値と同じです。 必要に応じて、これらの項目を明示的に設定してデフォルト値を上書きすることもできます。

  • spark-defaults.confファイルをアップロードする必要はありません。 代わりに、spark-defaults.confファイルの設定項目をODPS Sparkノードの設定に1つずつ追加する必要があります。 たとえば、エグゼキューターの数、メモリサイズ、spark.hadoop.odps.ru ntime.end.pointなどの設定項目を追加する必要があります。

-- conf PROP=VALUE

メインクラス

メインクラスの名前。 このパラメーターは、LanguageパラメーターをJava/Scalaに設定した場合にのみ必要です。

-- class CLASS_NAME

Parameters

ビジネス要件に基づいてパラメーターを追加できます。 複数ある場合は、パラメーターをスペースで区切ります。 DataWorksでは、${Variable name} 形式でスケジューリングパラメーターを追加できます。 パラメーターを追加したら、右側のナビゲーションウィンドウで [プロパティ] タブをクリックし、[スケジューリングパラメーター] セクションで関連する変数に値を割り当てる必要があります。

説明

サポートされているスケジューリングパラメーターの形式については、「サポートされているスケジューリングパラメーターの形式」をご参照ください。

[アプリ引数]

その他のリソース

次のタイプのリソースもサポートされています。 ビジネス要件に基づいて、次のタイプのリソースを選択できます。

  • Jarリソース: このタイプのリソースは、LanguageパラメーターをJava/Scalaに設定した場合にのみ選択できます。

  • Pythonリソース: このタイプのリソースは、LanguageパラメーターをPythonに設定した場合にのみ選択できます。

  • ファイルリソース

  • アーカイブリソース: 圧縮されたリソースのみが表示されます。

必要なリソースファイルをDataWorksにアップロードし、事前にリソースファイルをコミットする必要があります。 詳細については、「MaxComputeリソースの作成と使用」をご参照ください。

さまざまなタイプのリソースのコマンド:

  • -- 瓶ジャー

  • -- py-files PY_ファイル

  • -- filesファイル

  • -アーカイブアーカイブ

単純なコード編集の例

このセクションでは、ODPS Sparkノードを使用してSpark on MaxComputeタスクを開発する方法の簡単な例を示します。 この例では、Spark on MaxComputeタスクを開発して、文字列を数字に変換できるかどうかを判断します。

  1. リソースを作成します。

    1. DataWorksコンソールのDataStudioページで、spark_is_number.pyという名前のPythonリソースを作成します。 詳細については、「MaxComputeリソースの作成と使用」をご参照ください。 サンプルコード:

      # -*- coding: utf-8 -*-
      import sys
      from pyspark.sql import SparkSession
      
      try:
          # for python 2
          reload(sys)
          sys.setdefaultencoding('utf8')
      except:
          # python 3 not needed
          pass
      
      if __name__ == '__main__':
          spark = SparkSession.builder\
              .appName("spark sql")\
              .config("spark.sql.broadcastTimeout", 20 * 60)\
              .config("spark.sql.crossJoin.enabled", True)\
              .config("odps.exec.dynamic.partition.mode", "nonstrict")\
              .config("spark.sql.catalogImplementation", "odps")\
              .getOrCreate()
      
      def is_number(s):
          try:
              float(s)
              return True
          except ValueError:
              pass
      
          try:
              import unicodedata
              unicodedata.numeric(s)
              return True
          except (TypeError, ValueError):
              pass
      
          return False
      
      print(is_number('foo'))
      print(is_number('1'))
      print(is_number('1.3'))
      print(is_number('-1.37'))
      print(is_number('1e3'))
    2. リソースを保存してコミットします。

  2. 作成されたODPS Sparkノードで、このトピックの [パラメーターの説明] セクションを参照して、Spark on MaxComputeタスクのパラメーターとスケジューリングプロパティを設定し、ノードを保存してコミットします。

    パラメーター

    説明

    Sparkバージョン

    Spark2.xを選択します。

    言語

    [Python] を選択します。

    主要なPythonリソース

    作成したPythonリソースspark_is_number.py

  3. 開発環境のオペレーションセンターに移動し、ODPS Sparkノードのデータをバックフィルします。 詳細については、「データの埋め戻しとデータバックフィルインスタンスの表示 (新バージョン) 」をご参照ください。

    説明

    DataWorksは、DataStudioでODPS Sparkノードを実行するためのエントリポイントを提供しません。 開発環境のオペレーションセンターでODPS Sparkノードを実行する必要があります。

  4. 結果を表示します。

    データバックフィルインスタンスが正常に実行された後、生成された実行ログで [追跡URL] をクリックして結果を表示します。 次の情報が返されます。

    False
    True
    True
    True
    True

高度なコード編集の例

他のシナリオでのMaxComputeタスクでのSparkの開発の詳細については、以下のトピックを参照してください。

次のステップ

Spark on MaxComputeタスクの開発が完了したら、次の操作を実行できます。

  • スケジューリングプロパティの設定: ノードの定期的なスケジューリングのプロパティを設定できます。 システムが定期的にタスクをスケジュールして実行する場合は、再実行設定やスケジューリングの依存関係など、ノードの項目を構成する必要があります。 詳細については、「スケジュール」をご参照ください。

  • ノードのデバッグ: ノードのコードをデバッグおよびテストして、コードロジックが期待どおりかどうかを確認できます。 詳細については、「デバッグ手順」をご参照ください。

  • ノードのデプロイ: すべての開発操作が完了したら、ノードをデプロイできます。 ノードがデプロイされた後、システムはノードのスケジューリングプロパティに基づいてノードを定期的にスケジュールします。 詳細については、「ノードのデプロイ」をご参照ください。

  • Sparkタスクの診断を有効にする: MaxComputeはLogviewツールとSpark Web UIを提供します。 Sparkタスクのログを表示して、タスクが送信され、期待どおりに実行されているかどうかを確認できます。