KubernetesクラスターのelasticコンテナーインスタンスでSparkジョブを実行できます。 Elastic Container Instanceはスケーラブルなリソースを提供し、Sparkジョブの自動デプロイと高可用性を保証します。 これにより、Sparkジョブの実行効率と安定性が向上します。 このトピックでは、Kubernetes Operator for Apache SparkをContainer Service for Kubernetes (ACK) Serverlessクラスターにインストールし、伸縮性のあるコンテナインスタンスでSparkジョブを実行する方法について説明します。
背景情報
Apache Sparkは、ビッグデータコンピューティングや機械学習などのシナリオでワークロードを分析するために広く使用されているオープンソースプログラムです。 Kubernetesを使用して、Apache Spark 2.3.0以降のリソースを実行および管理できます。
Kubernetes Operator for Apache Sparkは、KubernetesクラスターでSparkジョブを実行するように設計されています。 カスタムリソース定義 (CRD) で定義されているSparkタスクをKubernetesクラスターに送信できます。 Kubernetes Operator for Apache Sparkには、次の利点があります。
オープンソースのApache Sparkと比較して、Kubernetes Operator for Apache Sparkはより多くの機能を提供します。
Kubernetes Operator for Apache Sparkは、Kubernetesのストレージ、モニタリング、およびログコンポーネントと統合できます。
Kubernetes Operator for Apache Sparkは、ディザスタリカバリや自動スケーリングなどの高度なKubernetes機能をサポートしています。 さらに、Kubernetes Operator for Apache Sparkはリソーススケジューリングを最適化できます。
準備
ACKサーバーレスクラスターを作成します。
ACKコンソールでACKサーバーレスクラスターを作成します。 詳細については、「ACKサーバーレスクラスターの作成」をご参照ください。
重要インターネット経由でイメージを取得する必要がある場合、またはトレーニングジョブがインターネットにアクセスする必要がある場合は、インターネットNATゲートウェイを設定する必要があります。
kubectlと次のいずれかの方法を使用して、ACKサーバーレスクラスターを管理およびアクセスできます。
オンプレミスマシンからクラスターを管理する場合は、kubectlクライアントをインストールして設定します。 詳細については、「kubectlを使用したACKクラスターへの接続」をご参照ください。
kubectlを使用して、Cloud Shell上のACKサーバーレスクラスターを管理します。 詳細については、「kubectlを使用したCloud ShellでのACKクラスターの管理」をご参照ください。
OSS バケットを作成します。
テストデータ、テスト結果、ログなどのデータを保存するためのObject Storage Service (OSS) バケットを作成する必要があります。 詳細については、「バケットの作成」をご参照ください。
Apache Spark用のKubernetes Operatorのインストール
Apache Spark用のKubernetes Operatorをインストールします。
ACKコンソールの左側のナビゲーションウィンドウで、[マーケットプレイス]> [マーケットプレイス] を選択します。
[アプリカタログ] タブで、検索ボックスに [ack-spark-operator] と入力し、[ack-spark-operator] を見つけて、アイコンをクリックします。
ページの右上隅にある [デプロイ] をクリックします。
デプロイパネルで、Kubernetes Operator for Apache Sparkをインストールするクラスターを選択し、画面の指示に従って設定を完了します。
ServiceAccount、Role、およびRoleBindingを作成します。
Sparkジョブでは、ポッドを作成する権限を取得するためにServiceAccountが必要です。 したがって、ServiceAccount、Role、およびRoleBindingを作成する必要があります。 ServiceAccount、Role、およびRoleBindingを作成する方法を次のYAMLの例に示します。 名前空間を実際の値に置き換えます。
apiVersion: v1 kind: ServiceAccount metadata: name: spark namespace: default --- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: namespace: default name: spark-role rules: - apiGroups: [""] resources: ["pods"] verbs: ["*"] - apiGroups: [""] resources: ["services"] verbs: ["*"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: spark-role-binding namespace: default subjects: - kind: ServiceAccount name: spark namespace: default roleRef: kind: Role name: spark-role apiGroup: rbac.authorization.k8s.io
Sparkジョブのイメージを作成する
SparkジョブのJava Archive (JAR) パッケージをコンパイルし、Dockerfileを使用してイメージをパッケージ化する必要があります。
次の例は、ACKのSparkベースイメージが使用されているときにDockerfileを設定する方法を示しています。
FROM registry.aliyuncs.com/acs/spark:ack-2.4.5-latest
RUN mkdir -p /opt/spark/jars
# If you want to read data from OSS or sink events to OSS, add the following JAR packages to the image.
ADD https://repo1.maven.org/maven2/com/aliyun/odps/hadoop-fs-oss/3.3.8-public/hadoop-fs-oss-3.3.8-public.jar $SPARK_HOME/jars
ADD https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.8.1/aliyun-sdk-oss-3.8.1.jar $SPARK_HOME/jars
ADD https://repo1.maven.org/maven2/org/aspectj/aspectjweaver/1.9.5/aspectjweaver-1.9.5.jar $SPARK_HOME/jars
ADD https://repo1.maven.org/maven2/org/jdom/jdom/1.1.3/jdom-1.1.3.jar $SPARK_HOME/jars
COPY SparkExampleScala-assembly-0.1.jar /opt/spark/jars
大きなSparkイメージをプルするには時間がかかる場合があります。 ImageCacheを使用して、イメージプルを高速化できます。 詳細については、「イメージキャッシュの管理」および「ImageCacheを使用したポッドの作成の高速化」をご参照ください。
Alibaba Cloud Sparkベースイメージを使用することもできます。 Alibaba Cloudは、Kubernetesクラスターでのリソーススケジューリングと自動スケーリングに最適化されたSpark 2.4.5ベースイメージを提供します。 この基本イメージは、リソーススケジューリングとアプリケーションの起動を高速化します。 HelmグラフのenableAlibaba CloudFeatureGates
変数をtrueに設定することで、スケジューリング最適化機能を有効にできます。 アプリケーションをより高速に起動したい場合は、enableWebhook
をfalseに設定できます。
Sparkジョブテンプレートを作成してジョブを送信する
SparkジョブのYAMLファイルを作成し、Sparkジョブをデプロイします。
spark-pi.yamlという名前のファイルを作成します。
次のコードでは、一般的なSparkジョブのサンプルテンプレートを示します。 詳細は、「spark-on-k8s演算子」をご参照ください。
apiVersion: "sparkoperator.k8s.io/v1beta2" kind: SparkApplication metadata: name: spark-pi namespace: default spec: type: Scala mode: cluster image: "registry.aliyuncs.com/acs/spark:ack-2.4.5-latest" imagePullPolicy: Always mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar" sparkVersion: "2.4.5" restartPolicy: type: Never driver: cores: 2 coreLimit: "2" memory: "3g" memoryOverhead: "1g" labels: version: 2.4.5 serviceAccount: spark annotations: k8s.aliyun.com/eci-kube-proxy-enabled: 'true' k8s.aliyun.com/eci-auto-imc: "true" tolerations: - key: "virtual-kubelet.io/provider" operator: "Exists" executor: cores: 2 instances: 1 memory: "3g" memoryOverhead: "1g" labels: version: 2.4.5 annotations: k8s.aliyun.com/eci-kube-proxy-enabled: 'true' k8s.aliyun.com/eci-auto-imc: "true" tolerations: - key: "virtual-kubelet.io/provider" operator: "Exists"
Sparkジョブをデプロイします。
kubectl apply -f spark-pi.yaml
ログ収集の設定
Sparkジョブのstdoutログを収集する場合は、SparkドライバーとSparkエグゼキュータのenvVarsフィールドで環境変数を設定できます。 その後、ログは自動的に収集されます。 詳細については、「エラスティックコンテナインスタンスのログ収集のカスタマイズ」をご参照ください。
envVars:
aliyun_logs_test-stdout_project: test-k8s-spark
aliyun_logs_test-stdout_machinegroup: k8s-group-app-spark
aliyun_logs_test-stdout: stdout
Sparkジョブを送信する前に、上記のコードに示すようにSparkドライバーとSparkエグゼキュータの環境変数を設定して、自動ログ収集を実装できます。
履歴サーバーの構成
Spark履歴サーバーを使用すると、Sparkジョブを確認できます。 SparkアプリケーションのCRDでSparkConfフィールドを設定して、アプリケーションがイベントをOSSにシンクできるようにすることができます。 その後、履歴サーバーを使用してOSSからデータを取得できます。 次のコードは、サンプル設定を提供します。
sparkConf:
"spark.eventLog.enabled": "true"
"spark.eventLog.dir": "oss://bigdatastore/spark-events"
"spark.hadoop.fs.oss.impl": "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"
# oss bucket endpoint such as oss-cn-beijing.aliyuncs.com
"spark.hadoop.fs.oss.endpoint": "oss-cn-beijing.aliyuncs.com"
"spark.hadoop.fs.oss.accessKeySecret": ""
"spark.hadoop.fs.oss.accessKeyId": ""
Alibaba Cloudは、Spark履歴サーバーをデプロイするためのヘルムチャートを提供します。 Spark履歴サーバーをデプロイするには、Container Service ACKコンソールにログインし、左側のナビゲーションウィンドウでMarketplace > Marketplaceを選択します。 [アプリカタログ] タブで、ack-spark-history-serverを検索してデプロイします。 Spark履歴サーバーをデプロイするときは、[パラメーター] セクションでOSS情報を指定する必要があります。 次のコードは例を示しています。
oss:
enableOSS: true
# Please input your accessKeyId
alibabaCloudAccessKeyId: ""
# Please input your accessKeySecret
alibabaCloudAccessKeySecret: ""
# oss bucket endpoint such as oss-cn-beijing.aliyuncs.com
alibabaCloudOSSEndpoint: "oss-cn-beijing.aliyuncs.com"
# oss file path such as oss://bucket-name/path
eventsDir: "oss://bigdatastore/spark-events"
Spark履歴サーバーをデプロイした後、[サービス] ページで外部エンドポイントを表示できます。 Spark履歴サーバーの外部エンドポイントにアクセスして、Sparkジョブの履歴を表示できます。
Sparkジョブの結果を表示する
ポッドのステータスを照会します。
kubectl get pods
期待される出力:
NAME READY STATUS RESTARTS AGE spark-pi-1547981232122-driver 1/1 Running 0 12s spark-pi-1547981232122-exec-1 1/1 Running 0 3s
リアルタイムSparkユーザーインターフェイスを照会します。
kubectl port-forward spark-pi-1547981232122-driver 4040:4040
Sparkアプリケーションのステータスを照会します。
kubectl describe sparkapplication spark-pi
期待される出力:
Name: spark-pi Namespace: default Labels: <none> Annotations: kubectl.kubernetes.io/last-applied-configuration: {"apiVersion":"sparkoperator.k8s.io/v1alpha1","kind":"SparkApplication","metadata":{"annotations":{},"name":"spark-pi","namespace":"default"...} API Version: sparkoperator.k8s.io/v1alpha1 Kind: SparkApplication Metadata: Creation Timestamp: 2019-01-20T10:47:08Z Generation: 1 Resource Version: 4923532 Self Link: /apis/sparkoperator.k8s.io/v1alpha1/namespaces/default/sparkapplications/spark-pi UID: bbe7445c-1ca0-11e9-9ad4-062fd7c19a7b Spec: Deps: Driver: Core Limit: 200m Cores: 0.1 Labels: Version: 2.4.0 Memory: 512m Service Account: spark Volume Mounts: Mount Path: /tmp Name: test-volume Executor: Cores: 1 Instances: 1 Labels: Version: 2.4.0 Memory: 512m Volume Mounts: Mount Path: /tmp Name: test-volume Image: gcr.io/spark-operator/spark:v2.4.0 Image Pull Policy: Always Main Application File: local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar Main Class: org.apache.spark.examples.SparkPi Mode: cluster Restart Policy: Type: Never Type: Scala Volumes: Host Path: Path: /tmp Type: Directory Name: test-volume Status: Application State: Error Message: State: COMPLETED Driver Info: Pod Name: spark-pi-driver Web UI Port: 31182 Web UI Service Name: spark-pi-ui-svc Execution Attempts: 1 Executor State: Spark - Pi - 1547981232122 - Exec - 1: COMPLETED Last Submission Attempt Time: 2019-01-20T10:47:14Z Spark Application Id: spark-application-1547981285779 Submission Attempts: 1 Termination Time: 2019-01-20T10:48:56Z Events: Type Reason Age From Message ---- ------ ---- ---- ------- Normal SparkApplicationAdded 55m spark-operator SparkApplication spark-pi was added, Enqueuing it for submission Normal SparkApplicationSubmitted 55m spark-operator SparkApplication spark-pi was submitted successfully Normal SparkDriverPending 55m (x2 over 55m) spark-operator Driver spark-pi-driver is pending Normal SparkExecutorPending 54m (x3 over 54m) spark-operator Executor spark-pi-1547981232122-exec-1 is pending Normal SparkExecutorRunning 53m (x4 over 54m) spark-operator Executor spark-pi-1547981232122-exec-1 is running Normal SparkDriverRunning 53m (x12 over 55m) spark-operator Driver spark-pi-driver is running Normal SparkExecutorCompleted 53m (x2 over 53m) spark-operator Executor spark-pi-1547981232122-exec-1 completed
ログにSparkジョブの結果を表示します。
NAME READY STATUS RESTARTS AGE spark-pi-1547981232122-driver 0/1 Completed 0 1m
SparkアプリケーションがSucceed状態の場合、またはSparkドライバポッドがCompleted状態の場合は、Sparkジョブの結果を使用できます。 ログを印刷し、ログにSparkジョブの結果を表示できます。
kubectl logs spark-pi-1547981232122-driver Pi is roughly 3.152155760778804