すべてのプロダクト
Search
ドキュメントセンター

DataWorks:ODPS Spark ジョブの開発

最終更新日:Feb 06, 2026

MaxCompute 上の Spark ジョブは、ローカルモード または クラスターモード のいずれかで実行できます。DataWorks 内では、オフラインジョブクラスターモード で実行することも可能であり、他の ノード タイプと統合できます。本トピックでは、これらの ジョブ の構成およびスケジュール設定方法について説明します。

概要

MaxCompute 上の Spark は、オープンソースの Spark と互換性のある MaxCompute が提供するコンピューティングサービスです。統一されたコンピューティングリソースおよびデータ権限システムの上に Spark コンピューティングフレームワークを提供し、データ処理および分析に際して馴染みのある開発手法を利用できます。DataWorks では、ODPS Spark ノード を使用することで、これらの ジョブ をスケジュール設定し、他のノードと統合できます。

MaxCompute 上の Spark ジョブ は、Java、Scala、Python のいずれかで開発できます。DataWorks で オフラインジョブ として実行する場合、クラスターモード で実行されます。MaxCompute 上の Spark の実行モードの詳細については、「実行モード」をご参照ください。

制限事項

Spark 3.x を使用する ODPS Spark ノード の送信が失敗した場合、サーバーレスリソースグループ を購入して使用する必要があります。詳細については、「サーバーレスリソースグループの使用」をご参照ください。

前提条件

ODPS Spark ノードは、Java/Scala または Python を使用して MaxCompute オフラインジョブで Spark を実行することをサポートしています。開発プロセスおよび構成オプションは、言語によって異なります。

Java/Scala

ODPS Spark ノード で Java または Scala コードを実行するには、まず ジョブ を開発し、パッケージ化したコードを DataWorks に MaxCompute リソース としてアップロードする必要があります。

  1. 開発環境を準備します。

    お使いのオペレーティングシステムに応じて、開発環境を準備してください。詳細については、「Linux 開発環境のセットアップ」および「Windows 開発環境のセットアップ」をご参照ください。

  2. Java/Scala コードを開発します。

    ローカル環境で MaxCompute 上の Spark コードを開発します。MaxCompute 上の Spark が提供するサンプルプロジェクトテンプレートの使用を推奨します。

  3. コードをパッケージ化し、DataWorks にアップロードします。

    コードの開発が完了したら、それをパッケージ化し、DataWorks に MaxCompute リソース としてアップロードします。詳細については、「MaxCompute リソースの作成と使用」をご参照ください。

Python(デフォルト環境を使用)

DataWorks では、PySpark コードを Python リソース 内に直接記述し、ODPS Spark ノード を使用して送信および実行できます。DataWorks における Python リソース の作成方法については、「MaxCompute リソースの作成と使用」をご参照ください。PySpark 開発の例については、「PySpark を使用した MaxCompute 上の Spark アプリケーションの開発」をご参照ください。

説明

この方法では、DataWorks が提供するデフォルトの Python 環境を使用します。この環境は、サードパーティパッケージのサポートが限定されています。お使いの ジョブ で他の依存関係が必要な場合は、「Python(カスタム環境を使用)」セクションに記載されているとおり、カスタム Python 環境を準備できます。あるいは、PyODPS 2 ノード または PyODPS 3 ノード を使用することもできます。これらのノードは、Python リソース に対するより優れたサポートを提供します。

Python(カスタム環境を使用)

デフォルトの Python 環境が要件を満たさない場合、MaxCompute 上の Spark ジョブ を実行するためのカスタム環境を準備します。

  1. ローカルの Python 環境を準備します。

    適切な Python 環境の設定手順については、「PySpark Python のバージョンと依存関係のサポート」をご参照ください。

  2. 環境をパッケージ化し、DataWorks にアップロードします。

    Python 環境を ZIP パッケージに圧縮し、DataWorks に MaxCompute リソース としてアップロードします。このパッケージは、MaxCompute 上の Spark ジョブ の実行環境を提供します。詳細については、「MaxCompute リソースの作成と使用」をご参照ください。

パラメーター

DataWorks は、Cluster Mode で MaxCompute 上の Spark Offline Jobs を実行します。このモードでは、カスタムプログラムのエントリポイントとして、main を指定する必要があります。main メソッドが完了すると Spark ジョブは終了し、Success または Fail ステータスを返します。さらに、spark-defaults.conf の構成を、ODPS Spark ノード構成アイテムセクションに 1 つずつ追加する必要があります。例として、エグゼキュータ インスタンスの数、メモリspark.hadoop.odps.runtime.end.point 構成などが挙げられます。

説明

spark-defaults.conf ファイルをアップロードしないでください。代わりに、各設定を ODPS Spark ノード の個別の 設定項目 として追加してください。

Spark任务配置

パラメーター

説明

spark-submit コマンド

Spark バージョン

利用可能なバージョンには、Spark 1.xSpark 2.x、および Spark 3.x があります。

説明

Spark 3.x を使用する ODPS Spark ノード の送信が失敗した場合、サーバーレスリソースグループ を購入して使用する必要があります。詳細については、「サーバーレスリソースグループの使用」をご参照ください。

言語

MaxCompute 上の Spark ジョブ の開発言語に応じて、Java/Scala または Python を選択します。

メインリソースの選択

ジョブ のメイン JAR リソース ファイルまたはメイン Python リソース を指定します。

まず、リソース ファイルを DataWorks にアップロードおよびコミットする必要があります。詳細については、「MaxCompute リソースの作成と使用」をご参照ください。

アプリケーション JAR または Python ファイル

設定項目

ジョブ の送信に必要な設定項目を指定します。

  • spark.hadoop.odps.access.id spark.hadoop.odps.access.key、および spark.hadoop.odps.end.point の構成は不要です。これらの値は MaxCompute プロジェクトのデフォルト値に設定されています。必要に応じて明示的に構成することで、デフォルト値をオーバーライドできます。

  • spark-defaults.conf ファイルをアップロードしないでください。代わりに、各設定を ODPS Spark ノード の個別の 設定項目 として追加してください。例:エグゼキュータインスタンス数、メモリ、および spark.hadoop.odps.runtime.end.point の構成です。

--conf PROP=VALUE

メインクラス

メインクラス の名前を指定します。この パラメーター は、Java/Scala ジョブ に対して必須です。

--class CLASS_NAME

引数

必要に応じて、スペースで区切って引数を追加できます。DataWorks ではスケジュール設定パラメーターをサポートしています。引数 の形式は ${変数名} です。構成後に、右側ペインの スケジュール設定の構成 > > パラメーター セクションで、変数に値を割り当てます。

説明

スケジュール設定パラメーターへの値の割り当てにサポートされる形式については、「スケジュール設定パラメーターのサポート形式」をご参照ください。

[アプリケーション引数]

その他のリソースの選択

必要に応じて、以下の リソース を選択できます。

  • JAR リソース:開発言語が Java/Scala の場合のみサポートされます。

  • Python リソース:開発言語が Python の場合のみサポートされます。

  • ファイル リソース

  • アーカイブ リソースZIP 形式 で圧縮された リソース のみが表示されます。

まず、リソース ファイルを DataWorks にアップロードおよびコミットする必要があります。詳細については、「MaxCompute リソースの作成と使用」をご参照ください。

異なる リソース は、以下のオプションに対応します:

  • --jars JARS

  • --py-files PY_FILES

  • --files FILES

  • --archives ARCHIVES

簡単な例

本セクションでは、ODPS Spark ノード を使用した簡単な例(文字列が数値かどうかを確認する)を紹介します。

  1. リソースを作成します。

    1. データ開発 ページで、新しい Python リソース を作成し、名前を spark_is_number.py とします。詳細については、「MaxCompute リソースの作成と使用」をご参照ください。

      # -*- coding: utf-8 -*-
      import sys
      from pyspark.sql import SparkSession
      
      try:
          # for python 2
          reload(sys)
          sys.setdefaultencoding('utf8')
      except:
          # python 3 not needed
          pass
      
      if __name__ == '__main__':
          spark = SparkSession.builder\
              .appName("spark sql")\
              .config("spark.sql.broadcastTimeout", 20 * 60)\
              .config("spark.sql.crossJoin.enabled", True)\
              .config("odps.exec.dynamic.partition.mode", "nonstrict")\
              .config("spark.sql.catalogImplementation", "odps")\
              .getOrCreate()
      
      def is_number(s):
          try:
              float(s)
              return True
          except ValueError:
              pass
      
          try:
              import unicodedata
              unicodedata.numeric(s)
              return True
          except (TypeError, ValueError):
              pass
      
          return False
      
      print(is_number('foo'))
      print(is_number('1'))
      print(is_number('1.3'))
      print(is_number('-1.37'))
      print(is_number('1e3'))
    2. リソース を保存およびコミットします。

  2. 作成した ODPS Spark ノード で、パラメーター に記載されている通り、ノードの パラメーター およびスケジュール設定 パラメーター を構成し、ノード を保存およびコミットします。

    パラメーター

    説明

    Spark バージョン

    Spark 2.x

    言語

    Python

    メイン Python リソースの選択

    ドロップダウンリストから、作成した Python リソースspark_is_number.py)を選択します。

  3. 開発環境の オペレーションセンター に移動し、データバックフィル ジョブ を実行します。詳細な手順については、「データバックフィルインスタンスの運用管理」をご参照ください。

    説明

    ODPS Spark ノードデータ開発 で直接実行できないため、開発環境の オペレーションセンター から ジョブ を実行する必要があります。

  4. 結果を確認します。

    データバックフィル インスタンス の実行が成功した後、実行ログ 内の トラッキング URL にアクセスして結果を確認します:

    False
    True
    True
    True
    True

高度な例

さまざまなユースケース向けに MaxCompute 上の Spark ジョブ を開発するためのその他の例については、以下のトピックをご参照ください:

次のステップ

ジョブ の開発が完了したら、以下の操作を実行できます。

  • スケジュール設定:ノードのスケジュール設定プロパティを構成します。タスクを定期的に実行する必要がある場合は、再実行設定やスケジュール依存関係などのプロパティを構成する必要があります。詳細については、「タスクのスケジュール設定プロパティの概要」をご参照ください。

  • タスクのデバッグ:現在のノードのコードをテストおよび実行し、コードのロジックが正しいことを検証します。詳細については、「タスクのデバッグプロセス」をご参照ください。

  • タスクのデプロイメント:すべての開発関連の操作が完了したら、すべてのノードをデプロイする必要があります。デプロイ後、ノードはスケジュール設定に従って定期的に実行されます。詳細については、「タスクのデプロイメント」をご参照ください。

  • Spark ジョブの診断:MaxCompute では、Logview ツールおよび Spark Web UI を使用して Spark ジョブ を診断できます。実行ログ を確認することで、正しく送信および実行されていることを検証できます。