PySpark は Python API 操作を呼び出して Spark ジョブを実行します。PySpark ジョブは、特定の Python 環境で実行する必要があります。デフォルトでは、E-MapReduce(EMR)は Python をサポートしています。EMR でサポートされている Python バージョンを使用して PySpark ジョブを実行できない場合は、このトピックを参照して、特定の要件を満たす Python 環境を構成し、DataWorks で PySpark ジョブを実行できます。
前提条件
このトピックでは、使用する DataWorks ワークスペースと EMR クラスタは同じリージョンにあります。このセクションでは、DataWorks 側と EMR 側で満たす必要がある前提条件について説明します。
DataWorks 側
DataWorks で PySpark ジョブを実行するために EMR Spark ノードが作成され、
spark-submitコマンドを実行して PySpark ジョブが送信されます。EMR 側
以下の構成を含む EMR 環境が準備されています。
EMR クラスタ。この例では、EMR on ECS クラスタを使用します。
サンプル検証に使用される Python パッケージ。オンプレミス マシンまたは Elastic Compute Service(ECS)インスタンスに Python 環境をパッケージ化できます。または、このトピックで使用されているサンプル Python パッケージ Python 3.7 を直接ダウンロードすることもできます。Python 環境をパッケージ化するには、Docker ランタイム環境 と Python ランタイム環境 がオンプレミス マシンまたは ECS インスタンスにインストールされていることを確認してください。
説明このトピックでは、Python 3.7 は参照用にのみ使用されています。ビジネス要件に基づいて Python バージョンを選択できます。EMR でサポートされている Python バージョンは、使用する Python バージョンと異なる場合があります。Python 3.7 を使用することをお勧めします。
手順
Python プログラムの実行に必要な仮想環境を準備します。
Python 3.7 パッケージを直接ダウンロードするか、以下の手順を実行して Python 環境をパッケージ化できます。Python 3.7 パッケージを直接ダウンロードすることをお勧めします。
Docker イメージを作成します。
このトピックで使用されているサンプル Dockerfile をオンプレミス マシンまたは ECS インスタンスに直接ダウンロードできます。または、Docker がインストールされているホストで Dockerfile を作成することもできます。Dockerfile には次の内容が含まれています。
FROM centos:centos7.9.2009 RUN set -ex \ # 必要なコンポーネントをプリインストールします。 && yum install -y wget tar libffi-devel zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make initscripts zip\ && wget https://www.python.org/ftp/python/3.7.0/Python-3.7.0.tgz \ && tar -zxvf Python-3.7.0.tgz \ && cd Python-3.7.0 \ && ./configure prefix=/usr/local/python3 \ && make \ && make install \ && make clean \ && rm -rf /Python-3.7.0* \ && yum install -y epel-release \ && yum install -y python-pip # デフォルトの Python バージョンを Python 3 に設定します。 RUN set -ex \ # 以前のバージョンの Python のリソースをバックアップします。 && mv /usr/bin/python /usr/bin/python27 \ && mv /usr/bin/pip /usr/bin/pip-python27 \ # デフォルトの Python バージョンを Python 3 に設定します。 && ln -s /usr/local/python3/bin/python3.7 /usr/bin/python \ && ln -s /usr/local/python3/bin/pip3 /usr/bin/pip # Python バージョンの変更によって発生した YUM のバグを修正します。 RUN set -ex \ && sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/bin/yum \ && sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/libexec/urlgrabber-ext-down \ && yum install -y deltarpm # pip を更新します。 RUN pip install --upgrade pipイメージをビルドし、Docker コンテナを実行します。
Dockerfile が保存されているパスで、次のコマンドを実行します。
sudo docker build -t python-centos:3.7 . sudo docker run -itd --name python3.7 python-centos:3.7コンテナをインストールし、Python 環境をパッケージ化するために必要な Python 依存関係ライブラリに移動します。
sudo docker exec -it python3.7 bash pip install [必要な依存関係ライブラリ] # vi requirements.txt # pip install -r requirements.txt # numpy # pandas cd /usr/local/ zip -r python3.7.zip python3/Python 環境パッケージをコンテナからホストにコピーします。
# ホストで次のコマンドを実行して、仮想環境をホストにコピーします。 sudo docker cp python3.7:/usr/local/python3.7.zip .
コピーした環境をアップロードします。
ビジネス要件に基づいて、コピーした環境を Hadoop Distributed File System(HDFS)または Object Storage Service(OSS)にアップロードできます。
説明この例では、コピーした環境は HDFS にアップロードされます。コピーした環境を OSS にアップロードする方法については、シンプルアップロード をご参照ください。
次のコマンドを実行して、コピーした環境を HDFS にアップロードします。
# コピーした環境を HDFS にアップロードします。 hdfs dfs -copyFromLocal python3.7.zip /tmp/pysparkPython コードをテストしてアップロードします。
オンプレミス マシンまたは ECS インスタンスに
XXX.pyファイルを作成し、そのファイルを使用して Python コードが正しいかどうかを確認します。この例では、pyspark_test.pyファイルをテストに使用します。# -*- coding: utf-8 -*- import os from pyspark.sql import SparkSession def noop(x): import socket import sys host = socket.gethostname() + ' '.join(sys.path) + ' '.join(os.environ) print('host: ' + host) print('PYTHONPATH: ' + os.environ['PYTHONPATH']) print('PWD: ' + os.environ['PWD']) print(os.listdir('.')) return host if __name__ == '__main__': spark = SparkSession \ .builder \ .appName("test_pyspark") \ .enableHiveSupport() \ .getOrCreate() sc = spark.sparkContext # システムの現在の環境変数を確認します。 rdd = sc.parallelize(range(10), 2) hosts = rdd.map(noop).distinct().collect() print(hosts) # ユーザー定義関数(UDF)を確認します。 # https://docs.databricks.com/spark/latest/spark-sql/udf-python.html# # spark.udf.register("udf_squared", udf_squared) # spark.udf.register("udf_numpy", udf_numpy) tableName = "store" df = spark.sql("""select count(*) from %s """ % tableName) print("rdf count, %s\n" % df.count()) df.show()説明サンプル テーブル名
storeを、データ ウェアハウスに存在するテーブルの名前に置き換える必要があります。Python コードを HDFS にアップロードします。
次のコマンドを実行して、Python コードを EMR クラスタの HDFS にアップロードします。
説明この例では、コピーした環境は HDFS にアップロードされます。コピーした環境を OSS にアップロードする方法については、シンプルアップロード をご参照ください。
hdfs dfs -copyFromLocal pyspark_test.py /tmp/pyspark
spark-submitコマンドを実行して、DataWorks の EMR Spark ノードでジョブを送信します。作成した EMR Spark ノードで、次のコマンドを実行してジョブを送信します。
説明Python コードを OSS にアップロードする場合は、コードに含まれる HDFS パスを関連する OSS パスに置き換えます。
spark-submit --master yarn \ --deploy-mode cluster \ --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./PYTHONENV/python3/bin/python3.7 \ --conf spark.executorEnv.PYTHONPATH=./PYTHONENV/python3/lib/python3.7/site-packages \ --conf spark.yarn.appMasterEnv.PYTHONPATH=./PYTHONENV/python3/lib/python3.7/site-packages \ --conf spark.yarn.appMasterEnv.JOBOWNER=LiuYuQuan \ --archives hdfs://hdfs-cluster/tmp/pyspark/python3.7.zip#PYTHONENV \ ## --py-files hdfs://hdfs-cluster/tmp/pyspark/mc_pyspark-0.1.0-py3-none-any.zip \ --driver-memory 4g \ --driver-cores 1 \ --executor-memory 4g \ --executor-cores 1 \ --num-executors 3 \ --name TestPySpark \ hdfs://hdfs-cluster/tmp/pyspark/pyspark_test.py