すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:Access ApsaraMQ for Kafka

最終更新日:Mar 28, 2025

このトピックでは、AnalyticDB for MySQL Spark を使用して、Elastic Network Interface(ENI)経由で ApsaraMQ for Kafka にアクセスする方法について説明します。

前提条件

準備

  1. ApsaraMQ for Kafka コンソール にログインし、[インスタンスの詳細] ページに移動して、Kafka インスタンスの vSwitch ID を取得します。

  2. Elastic Compute Service(ECS)コンソール にログインし、[セキュリティグループ] ページに移動して、Kafka インスタンスが追加されているセキュリティグループの ID を取得します。

  3. ApsaraMQ for Kafka コンソール にログインし、[ホワイトリスト管理] ページに移動して、vSwitch の CIDR ブロックが Kafka インスタンスのホワイトリストに追加されているかどうかを確認します。

手順

  1. Kafka インスタンスと AnalyticDB for MySQL Spark アプリケーションのバージョンに対応する JAR パッケージをダウンロードします。詳細については、「Kafka-clients」および「Spark-sql-kafka-0-10」をご参照ください。

  2. pom.xml ファイルに次の依存関係を追加します。

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
        <version>3.2.2</version>
        <scope>test</scope>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.1</version>
    </dependency>
  3. Spark Streaming という名前のプログラムを作成してパッケージ化します。この例では、生成されたパッケージの名前は spark-example.jar です。サンプルコード:

    package com.aliyun.spark.streaming
    
    // Kafka の ConsumerRecord クラスをインポートします。
    import org.apache.kafka.clients.consumer.ConsumerRecord
    // Spark の設定とセッションをインポートします。
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    // SparkKafka オブジェクトを定義します。
    object SparkKafka {
      def main(args: Array[String]): Unit = {
        // 引数の数が 3 未満の場合、エラーメッセージを出力して終了します。
        if(args.length < 3){
          System.err.println(
            """
              |args0: groupId
              |args1: topicName
              |args2: bootstrapServers
              |""".stripMargin)
          System.exit(1)
        }
        // 引数から groupId、topicName、bootstrapServers を取得します。
        val groupId = args(0)
        val topicName = args(1)
        val bootstrapServers = args(2)
    
    
        // SparkConf を作成し、シリアライザとアプリケーション名を設定します。
        val sparkConf: SparkConf = new SparkConf()
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .setAppName("SparkKafkaSub")
        // Kryo シリアライゼーションに使用するクラスを登録します。
        sparkConf.registerKryoClasses(Array(classOf[ConsumerRecord[_,_]]))
    
        // SparkSession を作成します。
        val sparkSession = SparkSession
          .builder()
          .config(sparkConf)
          .getOrCreate()
    
        // Kafka からデータを読み込むための DataFrame を作成します。
        val df = sparkSession
          .readStream
          .format("kafka")
         // Kafka インスタンスのエンドポイントを指定します。
          .option("kafka.bootstrap.servers", alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092)
         // サブスクライブするトピック名を指定します。
          .option("subscribe", kafka_test)
         // コンシューマーグループ ID を指定します。
          .option("group.id", kafka_groupId)
          .load()
    
        // key と value を文字列として選択し、コンソールに出力するクエリを定義します。
        val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
          .writeStream
          .outputMode("append")
          .format("console")
          .start()
        // クエリが終了するまで待機します。
        query.awaitTermination()
    
      }
    }
  4. ダウンロードした JAR パッケージと Spark Streaming プログラムを OSS にアップロードします。ファイルのアップロード方法の詳細については、「オブジェクトのアップロード」をご参照ください。

  5. Spark エディターに移動します。

    1. AnalyticDB for MySQL コンソール にログインします。コンソールの左上隅で、リージョンを選択します。左側のナビゲーションウィンドウで、クラスターリスト をクリックします。Data Lakehouse Edition タブで、管理するクラスターを見つけ、クラスター ID をクリックします。

    2. 左側のナビゲーションウィンドウで、[ジョブ開発] > Spark Jar 開発 を選択します。

  6. Spark ジョブのジョブリソースグループとジョブタイプを選択します。この例では、バッチタイプが選択されています。

  7. Spark エディターで次のコードを実行します。

    {
        "args": [
          "kafka_groupId",
          "kafka_test",
          "alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092"
        ],
        "file": "oss://<bucket_name>/spark-example.jar",
        "jars": "oss://<bucket_name>/kafka-clients-2.8.1.jar,oss://<bucket_name>/spark-sql-kafka-0-10_2.12-3.2.0.jar",
        "name": "Kafka Example",
        "className": "com.aliyun.spark.streaming.SparkKafka",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small",
            "spark.adb.eni.enabled": "true",
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****",
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****"
        }
    }

    次の表にパラメーターを示します。

    パラメーター

    説明

    args

    Spark ジョブに渡される引数。複数の引数はカンマ(,)で区切ります。

    file

    Spark ジョブのメインファイルのパス。メインファイルは、エントリ クラスを含む JAR パッケージ、または Python プログラムのエントリ ポイントとなる実行可能ファイルです。

    説明

    Spark ジョブのメインファイルは OSS に保存する必要があります。

    jars

    Spark ジョブに必要な JAR パッケージ。複数の JAR パッケージはカンマ(,)で区切ります。

    name

    Spark ジョブの名前。

    className

    Java または Scala プログラムのエントリクラス。このパラメーターは Python プログラムには必要ありません。

    spark.adb.eni.enabled

    ENI を有効にするかどうかを指定します。企業版、基礎版、または Data Lakehouse Edition Spark を使用して Kafka にアクセスする場合は、ENI を有効にする必要があります。

    spark.adb.eni.vswitchId

    Kafka インスタンスの vSwitch ID。

    spark.adb.eni.securityGroupId

    Kafka インスタンスが追加されているセキュリティグループの ID。

    conf

    Spark ジョブに必要な構成パラメーター。Apache Spark のパラメーターと似ています。パラメーターは key:value 形式である必要があります。複数のパラメーターはカンマ(,)で区切ります。Apache Spark のパラメーターとは異なる構成パラメーター、または AnalyticDB for MySQL 固有の構成パラメーターの詳細については、「Spark アプリケーションの構成パラメーター」をご参照ください。

  8. 実行 をクリックします。