このトピックでは、AnalyticDB for MySQL Spark Pythonジョブを開発し、仮想環境テクノロジを使用してPythonジョブのランタイム環境をパッケージ化する方法について説明します。
前提条件
AnalyticDB for MySQLData Lakehouse Editionクラスターが作成されます。
Object Storage Service (OSS) バケットは、AnalyticDB for MySQLクラスターと同じリージョンに作成されます。
ジョブのリソースグループは、AnalyticDB for MySQLのData Lakehouse Editionクラスター用に作成されます。 詳細については、「リソースグループの作成」をご参照ください。
AnalyticDB for MySQLクラスター用のデータベースアカウントが作成されます。
Alibaba Cloudアカウントを使用する場合は、特権アカウントを作成するだけで済みます。 詳細については、「データベースアカウントの作成」トピックの「特権アカウントの作成」セクションをご参照ください。
RAM (Resource Access Management) ユーザーを使用する場合は、特権アカウントと標準アカウントを作成し、標準アカウントをRAMユーザーに関連付ける必要があります。 詳細については、「データベースアカウントの作成」および「データベースアカウントの関連付けまたは関連付けの解除」をご参照ください。
PySpark の使用
次のサンプルコードを記述し、コードを
example.py
という名前のファイルに保存します。from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.getOrCreate() df = spark.sql("SELECT 1+1") df.printSchema() df.show()
example.py
ファイルをOSSにアップロードします。 詳細については、「オブジェクトのアップロード」をご参照ください。Sparkエディターに移動します。
AnalyticDB for MySQL コンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 クラスターリスト ページで、エディションタブをクリックします。 管理するクラスターを確認し、クラスター ID をクリックします。
左側のナビゲーションウィンドウで、 [ジョブ開発] > [Spark JAR開発] を選択します。
エディターの上部で、ジョブリソースグループとSparkジョブタイプを選択します。 この例では、バッチタイプが選択されています。
エディターで次のコードを実行します。
{ "name": "Spark Python Test", "file": "oss://{your oss bucket path}/example.py", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small" } }
パラメーターの詳細については、「概要」をご参照ください。
Python依存関係の使用
移動方法
自己開発またはサードパーティの依存関係を使用してPythonアプリケーションを開発する場合、Sparkジョブの送信時に、依存関係をOSSにアップロードし、pyFiles
パラメーターを設定する必要があります。
例
このセクションでは、カスタム関数を使用して従業員の税引き後所得を計算する方法の例を示します。 この例では、staff.csv
という名前のファイルがOSSにアップロードされます。 staff.csv
ファイルには次のデータが含まれています。
name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
依存関係をコンパイルしてOSSにアップロードします。
tools
という名前のフォルダを作成します。 フォルダにfunc.py
という名前のファイルを作成します。def tax(salary): """ convert string to int and cut 15% tax from the salary :param salary: The salary of staff worker :return: """ return 0.15 * int(salary)
ツール
フォルダーを圧縮してOSSにアップロードします。 この例では、フォルダーはtools.tar.gz
パッケージに圧縮されています。説明複数の依存するPythonファイルが必要な場合は、ファイルを gzパッケージ。 Pythonコード内のPythonファイルをモジュールとして参照できます。
次のサンプルコードを記述し、コードを
example.py
という名前のファイルに保存します。from __future__ import print_function from pyspark.sql import SparkSession from pyspark.sql.functions import udf from pyspark.sql.types import FloatType import sys # import third party file from tools import func if __name__ == "__main__": # init pyspark context spark = SparkSession.builder.appName("Python Example").getOrCreate() # read csv from oss to a dataframe, show the table cvs_file = sys.argv[1] df = spark.read.csv(cvs_file, mode="DROPMALFORMED", inferSchema=True, header=True) # print schema and data to the console df.printSchema() df.show() # create an udf taxCut = udf(lambda salary: func.tax(salary), FloatType()) # cut tax from salary and show result df.select("name", taxCut("salary").alias("final salary")).show() spark.stop()
example.py
ファイルをOSSにアップロードします。 詳細については、「オブジェクトのアップロード」をご参照ください。Sparkエディターに移動します。
AnalyticDB for MySQL コンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 クラスターリスト ページで、エディションタブをクリックします。 管理するクラスターを確認し、クラスター ID をクリックします。
左側のナビゲーションウィンドウで、 [ジョブ開発] > [Spark JAR開発] を選択します。
エディターの上部で、ジョブリソースグループとSparkジョブタイプを選択します。 この例では、バッチタイプが選択されています。
エディターで次のコードを実行します。
{ "name": "Spark Python", "file": "oss://<bucket name>/example.py", "pyFiles": ["oss://<bucket name>/tools.tar.gz"], "args": [ "oss://<bucket name>/staff.csv" ], "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 2, "spark.executor.resourceSpec": "small" } }
パラメーター:
file: PythonコードのOSSパス。
pyFiles: PySparkに必要なPython依存関係のOSSパス。 パスのサフィックスは、tarまたは .tar.gz 複数のパッケージをコンマ (,) で区切ります。
説明PySparkに必要なすべてのPython依存関係は、OSSに格納する必要があります。
args: JARパッケージに必要なパラメーター。 この例では、
staff.csv
ファイルのOSSパスが使用されています。
その他のパラメーターについては、「概要」をご参照ください。
仮想環境テクノロジーを使用して依存環境をパッケージ化
Pythonジョブを開発するときに複雑な依存環境に遭遇した場合、Pythonの仮想環境テクノロジーを使用して環境を管理および分離できます。 AnalyticDB for MySQL Sparkを使用すると、仮想環境テクノロジーを使用して、オンプレミスの依存環境をパッケージ化し、OSSにアップロードできます。 仮想環境の詳細については、「Pythonドキュメント」をご参照ください。
AnalyticDB for MySQL Sparkはglibc-devel 2.28を使用します。 このバージョンが仮想環境テクノロジと互換性がない場合、PySparkジョブの実行に失敗する可能性があります。
移動方法
仮想環境テクノロジーを使用してPython環境をパッケージ化するには、Sparkジョブの送信時にPython環境を圧縮してOSSにアップロードし、パラメーターを設定して次の設定を完了する必要があります。
Python環境パッケージのOSSパスを指定します。
Python環境パッケージが小さい場合は、
archives
パラメーターを設定します。Python環境パッケージが大きい場合は、
spark.exe cutorEnv.ADB_SPARK_DOWNLOAD_FILES
およびspark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES
パラメーターを設定します。
spark.pyspark. Python
パラメーターを設定して、オンプレミスのデバイスでpythonインタープリターのパスを指定します。
例
Linuxオペレーティングシステムを準備します。
仮想環境テクノロジーに基づいてPython環境をパッケージ化するには、Linuxオペレーティングシステムが必要です。 次のいずれかの方法を使用して、Linuxオペレーティングシステムを準備できます。 この例では、ECS (Elastic Compute Service) インスタンスが購入されています。
Centos 7またはAnolisOS 8を実行するECSインスタンスを購入します。 詳細については、「カスタム起動タブでインスタンスを作成する」をご参照ください。
Centos 7、AnolisOS 8以降のオペレーティングシステムをオンプレミスのデバイスにインストールします。
CentosまたはAnolisOSの公式Dockerイメージを使用し、イメージにPython環境をパッケージ化します。
仮想環境テクノロジーを使用してPython環境をパッケージ化し、パッケージをOSSにアップロードします。
virtualenvまたはcondaツールを使用して、依存するPython環境をパッケージ化します。 パッケージング中にPythonバージョンをカスタマイズできます。 この例では、virtualenvツールが使用されています。
# Create directory venv at current path with python3 # MUST ADD --copies ! virtualenv --copies --download --python python3.7 venv # active environment source venv/bin/activate # install third party modules pip install scikit-spark==0.4.0 # check the result pip list # compress the environment tar -czvf venv.tar.gz venv
説明メソッドの使用方法については、コンダ依存するPython環境をパッケージ化するツール。環境の管理.
Sparkエディターに移動します。
AnalyticDB for MySQL コンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 クラスターリスト ページで、エディションタブをクリックします。 管理するクラスターを確認し、クラスター ID をクリックします。
左側のナビゲーションウィンドウで、 [ジョブ開発] > [Spark JAR開発] を選択します。
エディターの上部で、ジョブリソースグループとSparkジョブタイプを選択します。 この例では、バッチタイプが選択されています。
エディターで次のコードを実行します。
{ "name": "venv example", "archives": [ "oss://testBucketname/venv.tar.gz#PY3" ], "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.pyspark.python": "./PY3/venv/bin/python3", "spark.executor.resourceSpec": "small" }, "file": "oss://testBucketname/example.py" }
または
説明Python環境パッケージが大きすぎる場合は、追加のパラメーターを設定します。
{ "name": "venv example", "conf": { "spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketname/venv_py36.tar.gz#PY3", "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://atestBucketname/venv_py36.tar.gz#PY3,", "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.pyspark.python": "./PY3/venv/bin/python3", "spark.executor.resourceSpec": "small" }, "file": "oss://testBucketname/example.py" }
パラメーター:
archives: Python環境パッケージのOSSパス。 この例では、
venv.tar.gz
パッケージのOSSパスが使用されています。spark.exe cutorEnv.ADB_SPARK_DOWNLOAD_FILES: Spark executorパラメーターであるPython環境パッケージのOSSパス。
spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES: SparkドライバーパラメーターであるPython環境パッケージのOSSパス。
spark.pyspark.python: Pythonインタープリターのオンプレミスパス。
その他のパラメーターについては、「概要」をご参照ください。