すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:PySparkを使用したSparkアプリケーションの開発

最終更新日:Nov 25, 2024

このトピックでは、AnalyticDB for MySQL Spark Pythonジョブを開発し、仮想環境テクノロジを使用してPythonジョブのランタイム環境をパッケージ化する方法について説明します。

前提条件

  • AnalyticDB for MySQLData Lakehouse Editionクラスターが作成されます。

  • Object Storage Service (OSS) バケットは、AnalyticDB for MySQLクラスターと同じリージョンに作成されます。

  • ジョブのリソースグループは、AnalyticDB for MySQLのData Lakehouse Editionクラスター用に作成されます。 詳細については、「リソースグループの作成」をご参照ください。

  • AnalyticDB for MySQLクラスター用のデータベースアカウントが作成されます。

PySpark の使用

  1. 次のサンプルコードを記述し、コードをexample.pyという名前のファイルに保存します。

    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
        spark = SparkSession.builder.getOrCreate()
        df = spark.sql("SELECT 1+1")
        df.printSchema()
        df.show()
    
  2. example.pyファイルをOSSにアップロードします。 詳細については、「オブジェクトのアップロード」をご参照ください。

  3. Sparkエディターに移動します。

    1. AnalyticDB for MySQL コンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 クラスターリスト ページで、エディションタブをクリックします。 管理するクラスターを確認し、クラスター ID をクリックします。

    2. 左側のナビゲーションウィンドウで、 [ジョブ開発] > [Spark JAR開発] を選択します。

  4. エディターの上部で、ジョブリソースグループとSparkジョブタイプを選択します。 この例では、バッチタイプが選択されています。

  5. エディターで次のコードを実行します。

    {
    
     "name": "Spark Python Test",
     "file": "oss://{your oss bucket path}/example.py",
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.executor.resourceSpec": "small"
     }
    }

    パラメーターの詳細については、「概要」をご参照ください。

Python依存関係の使用

移動方法

自己開発またはサードパーティの依存関係を使用してPythonアプリケーションを開発する場合、Sparkジョブの送信時に、依存関係をOSSにアップロードし、pyFilesパラメーターを設定する必要があります。

このセクションでは、カスタム関数を使用して従業員の税引き後所得を計算する方法の例を示します。 この例では、staff.csvという名前のファイルがOSSにアップロードされます。 staff.csvファイルには次のデータが含まれています。

name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
  1. 依存関係をコンパイルしてOSSにアップロードします。

    1. toolsという名前のフォルダを作成します。 フォルダにfunc.pyという名前のファイルを作成します。

      def tax(salary):
          """
          convert string to int and cut 15% tax from the salary
      
          :param salary: The salary of staff worker
          :return:
          """
          return 0.15 * int(salary)
      
    2. ツールフォルダーを圧縮してOSSにアップロードします。 この例では、フォルダーはtools.tar.gzパッケージに圧縮されています。

      説明

      複数の依存するPythonファイルが必要な場合は、ファイルを gzパッケージ。 Pythonコード内のPythonファイルをモジュールとして参照できます。

  2. 次のサンプルコードを記述し、コードをexample.pyという名前のファイルに保存します。

    from __future__ import print_function
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import udf
    from pyspark.sql.types import FloatType
    
    import sys
    
    # import third party file
    from tools import func
    
    if __name__ == "__main__":
        # init pyspark context
        spark = SparkSession.builder.appName("Python Example").getOrCreate()
        # read csv from oss to a dataframe, show the table
        cvs_file = sys.argv[1]
        df = spark.read.csv(cvs_file, mode="DROPMALFORMED", inferSchema=True, header=True)
        # print schema and data to the console
        df.printSchema()
        df.show()
        # create an udf
        taxCut = udf(lambda salary: func.tax(salary), FloatType())
        # cut tax from salary and show result
        df.select("name", taxCut("salary").alias("final salary")).show()
        spark.stop()
    
  3. example.pyファイルをOSSにアップロードします。 詳細については、「オブジェクトのアップロード」をご参照ください。

  4. Sparkエディターに移動します。

    1. AnalyticDB for MySQL コンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 クラスターリスト ページで、エディションタブをクリックします。 管理するクラスターを確認し、クラスター ID をクリックします。

    2. 左側のナビゲーションウィンドウで、 [ジョブ開発] > [Spark JAR開発] を選択します。

  5. エディターの上部で、ジョブリソースグループとSparkジョブタイプを選択します。 この例では、バッチタイプが選択されています。

  6. エディターで次のコードを実行します。

    {
     "name": "Spark Python",
     "file": "oss://<bucket name>/example.py",
     "pyFiles": ["oss://<bucket name>/tools.tar.gz"],
     "args": [
     "oss://<bucket name>/staff.csv"
     ],
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 2,
     "spark.executor.resourceSpec": "small"
     }
    }

    パラメーター:

    • file: PythonコードのOSSパス。

    • pyFiles: PySparkに必要なPython依存関係のOSSパス。 パスのサフィックスは、tarまたは .tar.gz 複数のパッケージをコンマ (,) で区切ります。

      説明

      PySparkに必要なすべてのPython依存関係は、OSSに格納する必要があります。

    • args: JARパッケージに必要なパラメーター。 この例では、staff.csvファイルのOSSパスが使用されています。

    その他のパラメーターについては、「概要」をご参照ください。

仮想環境テクノロジーを使用して依存環境をパッケージ化

Pythonジョブを開発するときに複雑な依存環境に遭遇した場合、Pythonの仮想環境テクノロジーを使用して環境を管理および分離できます。 AnalyticDB for MySQL Sparkを使用すると、仮想環境テクノロジーを使用して、オンプレミスの依存環境をパッケージ化し、OSSにアップロードできます。 仮想環境の詳細については、「Pythonドキュメント」をご参照ください。

重要

AnalyticDB for MySQL Sparkはglibc-devel 2.28を使用します。 このバージョンが仮想環境テクノロジと互換性がない場合、PySparkジョブの実行に失敗する可能性があります。

移動方法

仮想環境テクノロジーを使用してPython環境をパッケージ化するには、Sparkジョブの送信時にPython環境を圧縮してOSSにアップロードし、パラメーターを設定して次の設定を完了する必要があります。

  • Python環境パッケージのOSSパスを指定します。

    • Python環境パッケージが小さい場合は、archivesパラメーターを設定します。

    • Python環境パッケージが大きい場合は、spark.exe cutorEnv.ADB_SPARK_DOWNLOAD_FILESおよびspark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILESパラメーターを設定します。

  • spark.pyspark. Pythonパラメーターを設定して、オンプレミスのデバイスでpythonインタープリターのパスを指定します。

  1. Linuxオペレーティングシステムを準備します。

    仮想環境テクノロジーに基づいてPython環境をパッケージ化するには、Linuxオペレーティングシステムが必要です。 次のいずれかの方法を使用して、Linuxオペレーティングシステムを準備できます。 この例では、ECS (Elastic Compute Service) インスタンスが購入されています。

    • Centos 7またはAnolisOS 8を実行するECSインスタンスを購入します。 詳細については、「カスタム起動タブでインスタンスを作成する」をご参照ください。

    • Centos 7、AnolisOS 8以降のオペレーティングシステムをオンプレミスのデバイスにインストールします。

    • CentosまたはAnolisOSの公式Dockerイメージを使用し、イメージにPython環境をパッケージ化します。

  2. 仮想環境テクノロジーを使用してPython環境をパッケージ化し、パッケージをOSSにアップロードします。

    virtualenvまたはcondaツールを使用して、依存するPython環境をパッケージ化します。 パッケージング中にPythonバージョンをカスタマイズできます。 この例では、virtualenvツールが使用されています。

    # Create directory venv at current path with python3
    # MUST ADD --copies !
    virtualenv --copies --download --python python3.7 venv
    
    # active environment
    source venv/bin/activate
    
    # install third party modules
    pip install scikit-spark==0.4.0
    
    # check the result
    pip list
    
    # compress the environment
    tar -czvf venv.tar.gz venv
    説明

    メソッドの使用方法については、コンダ依存するPython環境をパッケージ化するツール。環境の管理.

  3. Sparkエディターに移動します。

    1. AnalyticDB for MySQL コンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 クラスターリスト ページで、エディションタブをクリックします。 管理するクラスターを確認し、クラスター ID をクリックします。

    2. 左側のナビゲーションウィンドウで、 [ジョブ開発] > [Spark JAR開発] を選択します。

  4. エディターの上部で、ジョブリソースグループとSparkジョブタイプを選択します。 この例では、バッチタイプが選択されています。

  5. エディターで次のコードを実行します。

    {
     "name": "venv example",
     "archives": [
     "oss://testBucketname/venv.tar.gz#PY3"
     ],
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.pyspark.python": "./PY3/venv/bin/python3",
     "spark.executor.resourceSpec": "small"
     },
     "file": "oss://testBucketname/example.py"
    }

    または

    説明

    Python環境パッケージが大きすぎる場合は、追加のパラメーターを設定します。

    {
     "name": "venv example",
     "conf": {
     "spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketname/venv_py36.tar.gz#PY3",
     "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://atestBucketname/venv_py36.tar.gz#PY3,",
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.pyspark.python": "./PY3/venv/bin/python3",
     "spark.executor.resourceSpec": "small"
     },
     "file": "oss://testBucketname/example.py"
    }

    パラメーター:

    • archives: Python環境パッケージのOSSパス。 この例では、venv.tar.gzパッケージのOSSパスが使用されています。

    • spark.exe cutorEnv.ADB_SPARK_DOWNLOAD_FILES: Spark executorパラメーターであるPython環境パッケージのOSSパス。

    • spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES: SparkドライバーパラメーターであるPython環境パッケージのOSSパス。

    • spark.pyspark.python: Pythonインタープリターのオンプレミスパス。

    その他のパラメーターについては、「概要」をご参照ください。