すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB for MySQL:Apache Kafkaのメッセージキューへのアクセス

最終更新日:Jun 14, 2024

このトピックでは、AnalyticDB for MySQL Data Lakehouse Edition (V3.0) Sparkを使用して、elastic network interface (ENI) 経由でMessage Queue for Apache Kafkaにアクセスする方法について説明します。

前提条件

  • AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターが作成されます。 詳細については、「クラスターの作成」をご参照ください。

  • ジョブリソースグループが作成されます。 詳細については、「リソースグループの作成」をご参照ください。

  • データベースアカウントが作成されます。

  • Apache KafkaインスタンスのMessage Queueは、AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターと同じリージョンに作成されます。 詳細については、「インターネットおよびVPCに接続されたインスタンスの購入とデプロイ」をご参照ください。

  • トピックとコンシューマーグループがKafkaインスタンスに作成されます。 詳細については、「リソースの作成」 をご参照ください。

  • Object Storage Service (OSS) が有効化され、AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターと同じリージョンにバケットが作成されます。 詳細については、「OSSの有効化」および「バケットの作成」をご参照ください。

準備

  1. ApsaraMQ for Kafkaコンソールにログインし、[インスタンスの詳細] ページに移動して、KafkaインスタンスのvSwitch IDを取得します。

  2. Elastic Compute Service (ECS) コンソールにログインし、[セキュリティグループ] ページに移動して、Kafkaインスタンスが追加されたセキュリティグループのIDを取得します。

  3. ApsaraMQ for Kafkaコンソールにログインし、ホワイトリスト管理ページに移動して、vSwitchのCIDRブロックがKafkaインスタンスのホワイトリストに追加されているかどうかを確認します。

手順

  1. KafkaインスタンスとAnalyticDB for MySQL Sparkアプリケーションのバージョンにそれぞれ対応するJARパッケージをダウンロードします。 詳細については、「Kafka-clients」および「Spark-sql-kafka-0-10」をご参照ください。

  2. 次の依存関係をpom.xmlファイルに追加します。

    <!- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 ->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
        <version>3.2.2</version>
        <scope> テスト </scope>
    </dependency>
    
    <!- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients ->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.1</version>
    </dependency> 
  3. Spark Streamingという名前のプログラムを作成してパッケージ化します。 この例では、生成されたパッケージの名前はspark-example.jarです。 サンプルコード:

    パッケージcom.aliyun.spark.streaming
    
    org.apache.kafka.clients.consumer.ConsumerRecordをインポートする
    org.apache.spark.SparkConfをインポートする
    org.apache.spark.sql.SparkSessionをインポートする
    
    オブジェクトSparkKafka {
      def main(args: Array[String]): Unit = {
        if(args.length < 3){
          System.err.println(
            """
              | args0: groupId
              | args1: topicName
              | args2: bootstrapServers
              | "" ".stripMargin)
          System.exit(1)
        }
        val groupId = args (0)
        val topicName = args (1)
        val bootstrapServers = args (2)
    
    
        val sparkConf: SparkConf = new SparkConf()
          . set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          . setAppName("SparkKafkaSub")
        sparkConf.registerKryoClasses (配列 (classOf[ConsumerRecord[_,_]]))
    
        val sparkSession = SparkSession
          .builder()
          . config(sparkConf)
          .getOrCreate()
    
        val df = sparkSession
          .readStream
          . フォーマット ("kafka")
         // Kafkaインスタンスのエンドポイント。 
          . オプション ("kafka.bootstrap.servers" 、alikafka-pre-cn-x0r34a20 ****-1-vpc.alikafka.aliyuncs.com:9092、alikafka-pre-cn-x0r34a20 ****-2-vpc.alikafka.aliyuncs.com:9092、alikafka-pre-cn-x0r34a20 ****-3-vpc.alikafka.aliyuncs.com:9092)
         // AnalyticDB for MySQLクラスターで使用するトピックの名前。 
          . オプション ("subscribe", kafka_test)
         // トピックを消費する消費者グループのID。 
          . オプション ("group.id", kafka_groupId)
          .load()
    
        val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
          . writeStream
          .outputMode("append")
          . 形式 (「コンソール」)
          .start()
        ssc.awaitTermination()
    
      }
    }
  4. ダウンロードしたJARパッケージとSpark StreamingプログラムをOSSにアップロードします。 ファイルのアップロード方法の詳細については、「オブジェクトのアップロード」をご参照ください。

  5. Sparkエディターに移動します。

    1. AnalyticDB for MySQLコンソールにログインします。

    2. ページの左上隅で、クラスターが存在するリージョンを選択します。

    3. 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。

    4. Lake Warehouse Edition(3.0) タブでクラスターを見つけ、クラスター ID をクリックします。

    5. 左側のナビゲーションウィンドウで、[ジョブの開発] > Spark Jar 開発 を選択します。

  6. Sparkジョブのジョブリソースグループとジョブタイプを選択します。 この例では、バッチタイプが選択されています。

  7. Sparkエディターで次のコードを実行します。

    {
        "args": [
          "kafka_groupId" 、
          "kafka_test" 、
          「alikafka-pre-cn-x0r34a20 **** 1-vpc.alikafka.aliyuncs.com:9092、alikafka-pre-cn-x0r34a20 **** 2-vpc.alikafka.aliyuncs.com:9092、alikafka-pre-cn-x0r34a20 **** 3-vpc.alikafka.aliyuncs.com:9092」
        ],
        "file": "oss://<bucket_name>/spark-example.jar" 、
        "jars": "oss://<bucket_name>/kafka-clients-2.8.1.jar、oss://<bucket_name>/spark-sql-kafka-0-10_2.12-3.2.0.jar" 、
        "name": "Kafka Example" 、
        "className": "com.aliyun.spark.streaming.SparkKafka" 、
        "conf": {
            "spark.driver.resourceSpec": "small" 、
            "spark.exe cutor.instances": 1、
            "spark.exe cutor.resourceSpec": "small" 、
            "spark.adb.eni.enabled": "true" 、
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y ****" 、
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx ****"
        }
    }

    下表に、各パラメーターを説明します。

    パラメーター

    説明

    args

    Sparkジョブに渡される引数。 複数の引数はコンマ (,) で区切ります。

    ファイル

    Sparkジョブのメインファイルのパス。 メインファイルは、エントリクラスを含むJARパッケージ、またはPythonプログラムのエントリポイントとして機能する実行可能ファイルです。

    説明

    SparkジョブのメインファイルはOSSに保存する必要があります。

    Sparkジョブに必要なJARパッケージ。 複数のJARパッケージをコンマ (,) で区切ります。

    名前

    Sparkジョブの名前。

    className

    JavaまたはScalaプログラムのエントリクラス。 このパラメーターは、Pythonプログラムには必要ありません。

    spark.adb.eni.enabled

    ENIを有効にするかどうかを指定します。 Data Lakehouse Edition (V3.0) Sparkを使用してKafkaにアクセスする場合は、ENIを有効にする必要があります。

    spark.adb.eni.vswitchId

    KafkaインスタンスのvSwitch ID。

    spark.adb.eni.securityGroupId

    Kafkaインスタンスが追加されたセキュリティグループのID。

    conf

    Sparkジョブに必要な設定パラメーター (Apache Sparkと同様) 。 パラメーターはkey:value形式である必要があります。 複数のパラメーターはコンマ (,) で区切ります。 Apache Sparkの設定パラメーターとは異なる設定パラメーター、またはAnalyticDB For MySQLに固有の設定パラメーターの詳細については、「Sparkアプリケーション設定パラメーター」をご参照ください。

  8. 実行 をクリックします。