Apache Sparkは、大規模なデータ処理用のコンピューティングエンジンです。 ビッグデータコンピューティングや機械学習のシナリオでワークロードを分析するために広く使用されています。 Spark Operatorは、Sparkジョブのデプロイを自動化し、KubernetesクラスターでSparkジョブのライフサイクルを管理する機能を提供します。 このトピックでは、Spark Operatorを使用してACKクラスターでSparkジョブを実行する方法について説明します。 これにより、データエンジニアはビッグデータ処理ジョブを迅速かつ効率的に実行および管理できます。
前提条件
Kubernetes 1.24以降を実行するACK ProクラスターまたはACK Serverless Proクラスターが作成されます。 詳細については、「ACKマネージドクラスターの作成」、「ACKサーバーレスクラスターの作成」、および「ACKクラスターの手動アップグレード」をご参照ください。
kubectlはACKクラスターに接続するために使用されます。 詳細については、「クラスターのkubeconfigファイルを取得し、kubectlを使用してクラスターに接続する」をご参照ください。
Introduction to Spark Operator
Spark Operatorは、KubernetesクラスターでSparkジョブを実行し、Sparkジョブのライフサイクル管理を自動化するように設計されています。 SparkApplication
やScheduledSparkApplication
などのCustomResourceDefinitions (CRD) を使用して、Sparkジョブを送信および管理できます。 Spark Operatorは、自動スケーリング、ヘルスチェック、リソース管理などのKubernetes機能を活用することで、Sparkジョブの実行を効率的に監視および最適化できます。 ACKは、オープンソースのkubeflow/spark-operatorコンポーネントに基づいてack-spark-operatorコンポーネントを提供します。 詳細については、「Spark演算子 | Kubeflow」をご参照ください。
利点:
簡易管理: Kubernetesの宣言型ジョブ設定を使用して、Sparkジョブのデプロイを自動化し、ライフサイクルを管理します。
マルチテナントのサポート: Kubernetesの名前空間とリソースクォータのメカニズムを使用して、ユーザー全体のリソースを割り当て、分離できます。 Kubernetesノード選択メカニズムを使用して、Sparkジョブが専用リソースを使用できるようにすることもできます。
Elasticリソースプロビジョニング: Elasticコンテナインスタンスまたはelasticノードプールを使用して、ピーク時に大量のelasticリソースを提供し、パフォーマンスとコストのバランスを取ります。
該当するシナリオ:
データ分析: データサイエンティストは、インタラクティブなデータ分析とデータクレンジングにSparkを使用できます。
バッチデータコンピューティング: スケジュールされたバッチジョブを実行して、多数のデータセットを処理できます。
リアルタイムデータ処理: Spark Streamingライブラリは、リアルタイムデータをストリーミングする機能を提供します。
手順の概要
このトピックでは、Spark Operatorを使用してACKクラスターでSparkジョブを実行および管理し、ビッグデータを効率的に処理する方法について説明します。
ack-spark-operatorコンポーネントをインストールする: ACKクラスターにSpark Operatorをインストールして、Sparkジョブを管理および実行します。
Sparkジョブの送信: Sparkジョブの構成ファイルを作成して送信し、データ処理タスクを実行します。
Sparkジョブの表示: ジョブのステータスを監視し、詳細な情報とログを取得します。
Spark web UIにアクセスする: webインターフェイスでSparkジョブの実行を表示します。
Sparkジョブの更新: ビジネス要件に基づいてジョブ設定を変更し、パラメーターを動的に変更できます。
Sparkジョブの削除: 完了した、またはコストを削減するために不要になったSparkジョブを削除できます。
ステップ1: ack-spark-operatorコンポーネントのインストール
ACKコンソールにログインします。 左側のナビゲーションウィンドウで、 を選択します。
[マーケットプレイス] ページで、[アプリカタログ] タブをクリックします。 [ack-spark-operator] を見つけてクリックします。
ack-spark-operatorページで、[デプロイ] をクリックします。
[デプロイ] パネルで、クラスターと名前空間を選択し、[次へ] をクリックします。
[パラメーター] ステップでパラメーターを設定し、[OK] をクリックします。
下表に一部のパラメーターを示します。 パラメーター設定は、ack-spark-operatorページの [パラメーター] セクションにあります。
パラメーター
説明
例
controller.replicas
コントローラーレプリカの数。
デフォルト値は 1 です。
webhook. レプリカ
webhookレプリカの数。
デフォルト値は 1 です。
spark.jobNamespaces
Sparkジョブを実行できる名前空間。 このパラメーターを空のままにすると、すべての名前空間でSparkジョブを実行できます。 複数の名前空間は、コンマ (,) で区切ります。
デフォルト値:
["Default"]
["]
: すべての名前空間。["ns1","ns2","ns3"]
: 1つ以上の名前空間を指定します。
spark.serviceAccount.name
Sparkジョブは、
spark-operator-spark
という名前のServiceAccountと、spark.jobNamespaces
で指定された各名前空間に対応するロールベースのアクセス制御 (RBAC) リソースを自動的に作成します。 ServiceAccountのカスタム名を指定し、Sparkジョブの送信時にカスタム名を指定できます。デフォルト値:
spark-operator-spark
ステップ2: Sparkジョブを送信する
SparkApplication YAMLファイルを作成して、データ処理用のSparkジョブを送信できます。
spark-pi. YAML
という名前のSparkApplication yamlファイルを作成します。apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: spark-pi namespace: default # Make sure that the namespace is in the namespace list specified by spark.jobNamespaces. spec: type: Scala mode: cluster image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2 imagePullPolicy: IfNotPresent mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar arguments: - "1000" sparkVersion: 3.5.2 driver: cores: 1 coreLimit: 1200m memory: 512m serviceAccount: spark-operator-spark # Replace spark-operator-spark with the custom name that you specified. executor: instances: 1 cores: 1 coreLimit: 1200m memory: 512m restartPolicy: type: Never
次のコマンドを実行して、Sparkジョブを送信します。
kubectl apply -f spark-pi.yaml
期待される出力:
sparkapplication.sparkoperator.k8s.io/spark-pi created
ステップ3: Sparkジョブを表示する
次のコマンドを実行して、Sparkジョブのステータス、ポッド情報、およびログを照会できます。
次のコマンドを実行して、Sparkジョブのステータスを表示します。
kubectl get sparkapplication spark-pi
期待される出力:
NAME STATUS ATTEMPTS START FINISH AGE spark-pi SUBMITTED 1 2024-06-04T03:17:11Z <no value> 15s
次のコマンドを実行し、sparkoperatorを設定し
ます。 Kubernetes、 io/app-name
ラベルをspark-pi
に設定して、Sparkジョブを実行するポッドのステータスを確認します。kubectl get pod -l sparkoperator.k8s.io/app-name=spark-pi
期待される出力:
NAME READY STATUS RESTARTS AGE spark-pi-7272428fc8f5f392-exec-1 1/1 Running 0 13s spark-pi-7272428fc8f5f392-exec-2 1/1 Running 0 13s spark-pi-driver 1/1 Running 0 49s
Sparkジョブが完了すると、すべてのエグゼキューターポッドがドライバによって自動的に削除されます。
次のコマンドを実行して、Sparkジョブの詳細を表示します。
kubectl describe sparkapplication spark-pi
次のコマンドを実行して、ドライバポッドの最後の20のログエントリを表示します。
kubectl logs -- tail=20 spark-pi-driver
期待される出力:
24/05/30 10:05:30 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 24/05/30 10:05:30 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 7.942 s 24/05/30 10:05:30 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job 24/05/30 10:05:30 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished 24/05/30 10:05:30 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 8.043996 s Pi is roughly 3.1419522314195225 24/05/30 10:05:30 INFO SparkContext: SparkContext is stopping with exitCode 0. 24/05/30 10:05:30 INFO SparkUI: Stopped Spark web UI at http://spark-pi-1e18858fc8f56b14-driver-svc.default.svc:4040 24/05/30 10:05:30 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 24/05/30 10:05:30 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 24/05/30 10:05:30 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 24/05/30 10:05:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/05/30 10:05:30 INFO MemoryStore: MemoryStore cleared 24/05/30 10:05:30 INFO BlockManager: BlockManager stopped 24/05/30 10:05:30 INFO BlockManagerMaster: BlockManagerMaster stopped 24/05/30 10:05:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/05/30 10:05:30 INFO SparkContext: Successfully stopped SparkContext 24/05/30 10:05:30 INFO ShutdownHookManager: Shutdown hook called 24/05/30 10:05:30 INFO ShutdownHookManager: Deleting directory /var/data/spark-14ed60f1-82cd-4a33-b1b3-9e5d975c5b1e/spark-01120c89-5296-4c83-8a20-0799eef4e0ee 24/05/30 10:05:30 INFO ShutdownHookManager: Deleting directory /tmp/spark-5f98ed73-576a-41be-855d-dabdcf7de189
の削除
ステップ4: Spark web UIにアクセスする
Sparkジョブは、Sparkジョブの実行を監視するweb UIを提供します。 Spark web UIにアクセスするために、kubectl port-forward
コマンドを実行して、クラスター内のポートをローカルポートにマップします。 Spark web UIサービスは、Sparkジョブが実行中であるか、ドライバーポッドが [実行中] 状態の場合にのみ使用できます。 Sparkジョブが完了すると、web UIにアクセスできなくなります。
ack-spark-operatorコンポーネントをデプロイすると、controller.uiService.enable
が自動的にtrue
に設定され、サービスが自動的に作成されます。 サービスのポートをローカルポートにマップして、web UIにアクセスできます。 コンポーネントのデプロイ時にcontroller.uiService.enable
をfalse
に設定した場合、サービスは作成されません。 この場合、ポッドのポートをマッピングすることで、web UIにアクセスできます。
kubectl port-forward
コマンドで指定されたローカルポートは、テスト環境にのみ適しており、運用環境には適していません。 この方法を使用するときは注意してください。
ビジネス要件に基づいて、サービスのポートまたはポッドのポートをローカルポートにマップできます。 次のセクションでは、関連するコマンドについて説明します。
次のコマンドを実行して、サービスのポートをマッピングしてweb UIにアクセスします。
kubectl port-forward services/spark-pi-ui-svc 4040
次のコマンドを実行して、ポッドのポートをマッピングしてweb UIにアクセスします。
kubectl port-forward pods/spark-pi-driver 4040
期待される出力:
Forwarding from 127.0.0.1:4040 -> 4040 Forwarding from [::1]:4040 -> 4040
からの転送
http:// 127.0.0.1:4040からweb UIにアクセスします。
(オプション) 手順5: Sparkジョブの更新
Sparkジョブのパラメーターを変更するには、SparkジョブのYAMLファイルを更新します。
spark-pi. YAML
という名前のyamlファイルを変更します。 たとえば、arguments
パラメーターを10000
に、executor
パラメーターを2
に設定します。apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: spark-pi spec: type: Scala mode: cluster image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2 imagePullPolicy: IfNotPresent mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar arguments: - "10000" sparkVersion: 3.5.2 driver: cores: 1 coreLimit: 1200m memory: 512m serviceAccount: spark executor: instances: 2 cores: 1 coreLimit: 1200m memory: 512m restartPolicy: type: Never
次のコマンドを実行して、Sparkジョブを更新します。
kubectl apply -f spark-pi.yaml
次のコマンドを実行して、Sparkジョブのステータスを表示します。
kubectl get sparkapplication spark-pi
Sparkジョブが再び実行されます。 期待される出力:
NAME STATUS ATTEMPTS START FINISH AGE spark-pi RUNNING 1 2024-06-04T03:37:34Z <no value> 20m
(オプション) 手順6: Sparkジョブを削除する
このトピックのすべての手順を実行した後、Sparkジョブが不要になった場合は、次のコマンドを使用して関連するリソースを解放できます。
次のコマンドを実行して、前の手順で作成したSparkジョブを削除します。
kubectl delete -f spark-pi.yaml
次のコマンドを実行することもできます。
kubectl delete sparkapplication spark-pi