すべてのプロダクト
Search
ドキュメントセンター

Elastic Container Instance:elasticコンテナーインスタンスでSparkジョブを実行する

最終更新日:Dec 24, 2024

KubernetesクラスターのelasticコンテナーインスタンスでSparkジョブを実行できます。 Elastic Container Instanceはスケーラブルなリソースを提供し、Sparkジョブの自動デプロイと高可用性を保証します。 これにより、Sparkジョブの実行効率と安定性が向上します。 このトピックでは、Kubernetes Operator for Apache SparkをContainer Service for Kubernetes (ACK) Serverlessクラスターにインストールし、伸縮性のあるコンテナインスタンスでSparkジョブを実行する方法について説明します。

背景情報

Apache Sparkは、ビッグデータコンピューティングや機械学習などのシナリオでワークロードを分析するために広く使用されているオープンソースプログラムです。 Kubernetesを使用して、Apache Spark 2.3.0以降のリソースを実行および管理できます。

Kubernetes Operator for Apache Sparkは、KubernetesクラスターでSparkジョブを実行するように設計されています。 カスタムリソース定義 (CRD) で定義されているSparkタスクをKubernetesクラスターに送信できます。 Kubernetes Operator for Apache Sparkには、次の利点があります。

  • オープンソースのApache Sparkと比較して、Kubernetes Operator for Apache Sparkはより多くの機能を提供します。

  • Kubernetes Operator for Apache Sparkは、Kubernetesのストレージ、モニタリング、およびログコンポーネントと統合できます。

  • Kubernetes Operator for Apache Sparkは、ディザスタリカバリや自動スケーリングなどの高度なKubernetes機能をサポートしています。 さらに、Kubernetes Operator for Apache Sparkはリソーススケジューリングを最適化できます。

準備

  1. ACKサーバーレスクラスターを作成します。

    ACKコンソールACKサーバーレスクラスターを作成します。 詳細については、「ACKサーバーレスクラスターの作成」をご参照ください。

    重要

    インターネット経由でイメージを取得する必要がある場合、またはトレーニングジョブがインターネットにアクセスする必要がある場合は、インターネットNATゲートウェイを設定する必要があります。

    kubectlと次のいずれかの方法を使用して、ACKサーバーレスクラスターを管理およびアクセスできます。

  2. OSS バケットを作成します。

    テストデータ、テスト結果、ログなどのデータを保存するためのObject Storage Service (OSS) バケットを作成する必要があります。 詳細については、「バケットの作成」をご参照ください。

Apache Spark用のKubernetes Operatorのインストール

  1. Apache Spark用のKubernetes Operatorをインストールします。

    1. ACKコンソールの左側のナビゲーションウィンドウで、[マーケットプレイス]> [マーケットプレイス] を選択します。

    2. [アプリカタログ] タブで、検索ボックスに [ack-spark-operator] と入力し、[ack-spark-operator] を見つけて、アイコンをクリックします。

    3. ページの右上隅にある [デプロイ] をクリックします。

    4. デプロイパネルで、Kubernetes Operator for Apache Sparkをインストールするクラスターを選択し、画面の指示に従って設定を完了します。

  2. ServiceAccount、Role、およびRoleBindingを作成します。

    Sparkジョブでは、ポッドを作成する権限を取得するためにServiceAccountが必要です。 したがって、ServiceAccount、Role、およびRoleBindingを作成する必要があります。 ServiceAccount、Role、およびRoleBindingを作成する方法を次のYAMLの例に示します。 名前空間を実際の値に置き換えます。

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: spark
      namespace: default
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      namespace: default
      name: spark-role
    rules:
    - apiGroups: [""]
      resources: ["pods"]
      verbs: ["*"]
    - apiGroups: [""]
      resources: ["services"]
      verbs: ["*"]
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: spark-role-binding
      namespace: default
    subjects:
    - kind: ServiceAccount
      name: spark
      namespace: default
    roleRef:
      kind: Role
      name: spark-role
      apiGroup: rbac.authorization.k8s.io

Sparkジョブのイメージを作成する

SparkジョブのJava Archive (JAR) パッケージをコンパイルし、Dockerfileを使用してイメージをパッケージ化する必要があります。

次の例は、ACKのSparkベースイメージが使用されているときにDockerfileを設定する方法を示しています。

FROM registry.aliyuncs.com/acs/spark:ack-2.4.5-latest
RUN mkdir -p /opt/spark/jars
# If you want to read data from OSS or sink events to OSS, add the following JAR packages to the image.
ADD https://repo1.maven.org/maven2/com/aliyun/odps/hadoop-fs-oss/3.3.8-public/hadoop-fs-oss-3.3.8-public.jar $SPARK_HOME/jars
ADD https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.8.1/aliyun-sdk-oss-3.8.1.jar $SPARK_HOME/jars
ADD https://repo1.maven.org/maven2/org/aspectj/aspectjweaver/1.9.5/aspectjweaver-1.9.5.jar $SPARK_HOME/jars
ADD https://repo1.maven.org/maven2/org/jdom/jdom/1.1.3/jdom-1.1.3.jar $SPARK_HOME/jars
COPY SparkExampleScala-assembly-0.1.jar /opt/spark/jars
重要

大きなSparkイメージをプルするには時間がかかる場合があります。 ImageCacheを使用して、イメージプルを高速化できます。 詳細については、「イメージキャッシュの管理」および「ImageCacheを使用したポッドの作成の高速化」をご参照ください。

Alibaba Cloud Sparkベースイメージを使用することもできます。 Alibaba Cloudは、Kubernetesクラスターでのリソーススケジューリングと自動スケーリングに最適化されたSpark 2.4.5ベースイメージを提供します。 この基本イメージは、リソーススケジューリングとアプリケーションの起動を高速化します。 HelmグラフのenableAlibaba CloudFeatureGates変数をtrueに設定することで、スケジューリング最適化機能を有効にできます。 アプリケーションをより高速に起動したい場合は、enableWebhookをfalseに設定できます。 spark-3

Sparkジョブテンプレートを作成してジョブを送信する

SparkジョブのYAMLファイルを作成し、Sparkジョブをデプロイします。

  1. spark-pi.yamlという名前のファイルを作成します。

    次のコードでは、一般的なSparkジョブのサンプルテンプレートを示します。 詳細は、「spark-on-k8s演算子」をご参照ください。

    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
      name: spark-pi
      namespace: default
    spec:
      type: Scala
      mode: cluster
      image: "registry.aliyuncs.com/acs/spark:ack-2.4.5-latest"
      imagePullPolicy: Always
      mainClass: org.apache.spark.examples.SparkPi
      mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar"
      sparkVersion: "2.4.5"
      restartPolicy:
        type: Never
      driver:
        cores: 2
        coreLimit: "2"
        memory: "3g"
        memoryOverhead: "1g"
        labels:
          version: 2.4.5
        serviceAccount: spark
        annotations:
          k8s.aliyun.com/eci-kube-proxy-enabled: 'true'
          k8s.aliyun.com/eci-auto-imc: "true"
        tolerations:
        - key: "virtual-kubelet.io/provider"
          operator: "Exists"
      executor:
        cores: 2
        instances: 1
        memory: "3g"
        memoryOverhead: "1g"
        labels:
          version: 2.4.5
        annotations:
          k8s.aliyun.com/eci-kube-proxy-enabled: 'true'
          k8s.aliyun.com/eci-auto-imc: "true"
        tolerations:
        - key: "virtual-kubelet.io/provider"
          operator: "Exists"
  2. Sparkジョブをデプロイします。

    kubectl apply -f spark-pi.yaml

ログ収集の設定

Sparkジョブのstdoutログを収集する場合は、SparkドライバーとSparkエグゼキュータのenvVarsフィールドで環境変数を設定できます。 その後、ログは自動的に収集されます。 詳細については、「エラスティックコンテナインスタンスのログ収集のカスタマイズ」をご参照ください。

envVars:
   aliyun_logs_test-stdout_project: test-k8s-spark
   aliyun_logs_test-stdout_machinegroup: k8s-group-app-spark
   aliyun_logs_test-stdout: stdout

Sparkジョブを送信する前に、上記のコードに示すようにSparkドライバーとSparkエグゼキュータの環境変数を設定して、自動ログ収集を実装できます。 spark-1

履歴サーバーの構成

Spark履歴サーバーを使用すると、Sparkジョブを確認できます。 SparkアプリケーションのCRDでSparkConfフィールドを設定して、アプリケーションがイベントをOSSにシンクできるようにすることができます。 その後、履歴サーバーを使用してOSSからデータを取得できます。 次のコードは、サンプル設定を提供します。

sparkConf:
   "spark.eventLog.enabled": "true"
   "spark.eventLog.dir": "oss://bigdatastore/spark-events"
   "spark.hadoop.fs.oss.impl": "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"
   # oss bucket endpoint such as oss-cn-beijing.aliyuncs.com
   "spark.hadoop.fs.oss.endpoint": "oss-cn-beijing.aliyuncs.com"
   "spark.hadoop.fs.oss.accessKeySecret": ""
   "spark.hadoop.fs.oss.accessKeyId": ""

Alibaba Cloudは、Spark履歴サーバーをデプロイするためのヘルムチャートを提供します。 Spark履歴サーバーをデプロイするには、Container Service ACKコンソールにログインし、左側のナビゲーションウィンドウでMarketplace > Marketplaceを選択します。 [アプリカタログ] タブで、ack-spark-history-serverを検索してデプロイします。 Spark履歴サーバーをデプロイするときは、[パラメーター] セクションでOSS情報を指定する必要があります。 次のコードは例を示しています。

oss:
  enableOSS: true
  # Please input your accessKeyId
  alibabaCloudAccessKeyId: ""
  # Please input your accessKeySecret
  alibabaCloudAccessKeySecret: ""
  # oss bucket endpoint such as oss-cn-beijing.aliyuncs.com
  alibabaCloudOSSEndpoint: "oss-cn-beijing.aliyuncs.com"
  # oss file path such as oss://bucket-name/path
  eventsDir: "oss://bigdatastore/spark-events"

Spark履歴サーバーをデプロイした後、[サービス] ページで外部エンドポイントを表示できます。 Spark履歴サーバーの外部エンドポイントにアクセスして、Sparkジョブの履歴を表示できます。 spark-2

Sparkジョブの結果を表示する

  1. ポッドのステータスを照会します。

    kubectl get pods

    期待される出力:

    NAME                            READY      STATUS     RESTARTS   AGE
    spark-pi-1547981232122-driver   1/1       Running    0          12s
    spark-pi-1547981232122-exec-1   1/1       Running    0          3s
  2. リアルタイムSparkユーザーインターフェイスを照会します。

    kubectl port-forward spark-pi-1547981232122-driver 4040:4040
  3. Sparkアプリケーションのステータスを照会します。

    kubectl describe sparkapplication spark-pi

    期待される出力:

    Name:         spark-pi
    Namespace:    default
    Labels:       <none>
    Annotations:  kubectl.kubernetes.io/last-applied-configuration:
                    {"apiVersion":"sparkoperator.k8s.io/v1alpha1","kind":"SparkApplication","metadata":{"annotations":{},"name":"spark-pi","namespace":"default"...}
    API Version:  sparkoperator.k8s.io/v1alpha1
    Kind:         SparkApplication
    Metadata:
      Creation Timestamp:  2019-01-20T10:47:08Z
      Generation:          1
      Resource Version:    4923532
      Self Link:           /apis/sparkoperator.k8s.io/v1alpha1/namespaces/default/sparkapplications/spark-pi
      UID:                 bbe7445c-1ca0-11e9-9ad4-062fd7c19a7b
    Spec:
      Deps:
      Driver:
        Core Limit:  200m
        Cores:       0.1
        Labels:
          Version:        2.4.0
        Memory:           512m
        Service Account:  spark
        Volume Mounts:
          Mount Path:  /tmp
          Name:        test-volume
      Executor:
        Cores:      1
        Instances:  1
        Labels:
          Version:  2.4.0
        Memory:     512m
        Volume Mounts:
          Mount Path:         /tmp
          Name:               test-volume
      Image:                  gcr.io/spark-operator/spark:v2.4.0
      Image Pull Policy:      Always
      Main Application File:  local:///opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar
      Main Class:             org.apache.spark.examples.SparkPi
      Mode:                   cluster
      Restart Policy:
        Type:  Never
      Type:    Scala
      Volumes:
        Host Path:
          Path:  /tmp
          Type:  Directory
        Name:    test-volume
    Status:
      Application State:
        Error Message:
        State:          COMPLETED
      Driver Info:
        Pod Name:             spark-pi-driver
        Web UI Port:          31182
        Web UI Service Name:  spark-pi-ui-svc
      Execution Attempts:     1
      Executor State:
        Spark - Pi - 1547981232122 - Exec - 1:  COMPLETED
      Last Submission Attempt Time:             2019-01-20T10:47:14Z
      Spark Application Id:                     spark-application-1547981285779
      Submission Attempts:                      1
      Termination Time:                         2019-01-20T10:48:56Z
    Events:
      Type    Reason                     Age                 From            Message
      ----    ------                     ----                ----            -------
      Normal  SparkApplicationAdded      55m                 spark-operator  SparkApplication spark-pi was added, Enqueuing it for submission
      Normal  SparkApplicationSubmitted  55m                 spark-operator  SparkApplication spark-pi was submitted successfully
      Normal  SparkDriverPending         55m (x2 over 55m)   spark-operator  Driver spark-pi-driver is pending
      Normal  SparkExecutorPending       54m (x3 over 54m)   spark-operator  Executor spark-pi-1547981232122-exec-1 is pending
      Normal  SparkExecutorRunning       53m (x4 over 54m)   spark-operator  Executor spark-pi-1547981232122-exec-1 is running
      Normal  SparkDriverRunning         53m (x12 over 55m)  spark-operator  Driver spark-pi-driver is running
      Normal  SparkExecutorCompleted     53m (x2 over 53m)   spark-operator  Executor spark-pi-1547981232122-exec-1 completed
  4. ログにSparkジョブの結果を表示します。

    NAME                                      READY     STATUS      RESTARTS   AGE
    spark-pi-1547981232122-driver   0/1       Completed   0          1m

    SparkアプリケーションがSucceed状態の場合、またはSparkドライバポッドがCompleted状態の場合は、Sparkジョブの結果を使用できます。 ログを印刷し、ログにSparkジョブの結果を表示できます。

    kubectl logs spark-pi-1547981232122-driver
    Pi is roughly 3.152155760778804