このトピックでは、AnalyticDB for MySQL Spark を使用して、Elastic Network Interface(ENI)経由で ApsaraMQ for Kafka にアクセスする方法について説明します。
前提条件
AnalyticDB for MySQL Data Lakehouse Edition クラスタが作成されていること。
AnalyticDB for MySQL クラスタのジョブリソースグループが作成されていること。詳細については、「リソースグループの作成と管理」をご参照ください。
AnalyticDB for MySQL クラスター用にデータベースアカウントが作成されます。
Alibaba Cloud アカウントを使用する場合、特権アカウントを作成するだけで済みます。詳細については、「データベースアカウントを作成する」トピックの「特権アカウントを作成する」セクションをご参照ください。
Resource Access Management ( RAM ) ユーザーを使用する場合は、特権アカウントと標準アカウントを作成し、標準アカウントを RAM ユーザーに関連付ける必要があります。詳細については、「データベースアカウントを作成する」および「データベースアカウントを RAM ユーザーに関連付ける、または関連付けを解除する」をご参照ください。
AnalyticDB for MySQL クラスタと同じリージョンに ApsaraMQ for Kafka インスタンスが作成されていること。詳細については、「インターネット接続および VPC 接続インスタンスの購入とデプロイ」をご参照ください。
Kafka インスタンスにトピックとコンシューマーグループが作成されていること。詳細については、「リソースの作成」をご参照ください。
Object Storage Service(OSS)がアクティブ化されており、AnalyticDB for MySQL クラスタと同じリージョンにバケットが作成されていること。詳細については、「OSS のアクティブ化」および「バケットの作成」をご参照ください。
準備
ApsaraMQ for Kafka コンソール にログインし、[インスタンスの詳細] ページに移動して、Kafka インスタンスの vSwitch ID を取得します。
Elastic Compute Service(ECS)コンソール にログインし、[セキュリティグループ] ページに移動して、Kafka インスタンスが追加されているセキュリティグループの ID を取得します。
ApsaraMQ for Kafka コンソール にログインし、[ホワイトリスト管理] ページに移動して、vSwitch の CIDR ブロックが Kafka インスタンスのホワイトリストに追加されているかどうかを確認します。
手順
Kafka インスタンスと AnalyticDB for MySQL Spark アプリケーションのバージョンに対応する JAR パッケージをダウンロードします。詳細については、「Kafka-clients」および「Spark-sql-kafka-0-10」をご参照ください。
pom.xml ファイルに次の依存関係を追加します。
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.2.2</version> <scope>test</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.1</version> </dependency>Spark Streamingという名前のプログラムを作成してパッケージ化します。この例では、生成されたパッケージの名前はspark-example.jarです。サンプルコード:package com.aliyun.spark.streaming // Kafka の ConsumerRecord クラスをインポートします。 import org.apache.kafka.clients.consumer.ConsumerRecord // Spark の設定とセッションをインポートします。 import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession // SparkKafka オブジェクトを定義します。 object SparkKafka { def main(args: Array[String]): Unit = { // 引数の数が 3 未満の場合、エラーメッセージを出力して終了します。 if(args.length < 3){ System.err.println( """ |args0: groupId |args1: topicName |args2: bootstrapServers |""".stripMargin) System.exit(1) } // 引数から groupId、topicName、bootstrapServers を取得します。 val groupId = args(0) val topicName = args(1) val bootstrapServers = args(2) // SparkConf を作成し、シリアライザとアプリケーション名を設定します。 val sparkConf: SparkConf = new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .setAppName("SparkKafkaSub") // Kryo シリアライゼーションに使用するクラスを登録します。 sparkConf.registerKryoClasses(Array(classOf[ConsumerRecord[_,_]])) // SparkSession を作成します。 val sparkSession = SparkSession .builder() .config(sparkConf) .getOrCreate() // Kafka からデータを読み込むための DataFrame を作成します。 val df = sparkSession .readStream .format("kafka") // Kafka インスタンスのエンドポイントを指定します。 .option("kafka.bootstrap.servers", alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092) // サブスクライブするトピック名を指定します。 .option("subscribe", kafka_test) // コンシューマーグループ ID を指定します。 .option("group.id", kafka_groupId) .load() // key と value を文字列として選択し、コンソールに出力するクエリを定義します。 val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .writeStream .outputMode("append") .format("console") .start() // クエリが終了するまで待機します。 query.awaitTermination() } }ダウンロードした JAR パッケージと
Spark Streamingプログラムを OSS にアップロードします。ファイルのアップロード方法の詳細については、「オブジェクトのアップロード」をご参照ください。Spark エディターに移動します。
AnalyticDB for MySQL コンソール にログインします。コンソールの左上隅で、リージョンを選択します。左側のナビゲーションウィンドウで、クラスターリスト をクリックします。Data Lakehouse Edition タブで、管理するクラスターを見つけ、クラスター ID をクリックします。
左側のナビゲーションウィンドウで、 を選択します。
Spark ジョブのジョブリソースグループとジョブタイプを選択します。この例では、バッチタイプが選択されています。
Spark エディターで次のコードを実行します。
{ "args": [ "kafka_groupId", "kafka_test", "alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092" ], "file": "oss://<bucket_name>/spark-example.jar", "jars": "oss://<bucket_name>/kafka-clients-2.8.1.jar,oss://<bucket_name>/spark-sql-kafka-0-10_2.12-3.2.0.jar", "name": "Kafka Example", "className": "com.aliyun.spark.streaming.SparkKafka", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small", "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****" } }次の表にパラメーターを示します。
パラメーター
説明
argsSpark ジョブに渡される引数。複数の引数はカンマ(,)で区切ります。
fileSpark ジョブのメインファイルのパス。メインファイルは、エントリ クラスを含む JAR パッケージ、または Python プログラムのエントリ ポイントとなる実行可能ファイルです。
説明Spark ジョブのメインファイルは OSS に保存する必要があります。
jarsSpark ジョブに必要な JAR パッケージ。複数の JAR パッケージはカンマ(,)で区切ります。
nameSpark ジョブの名前。
classNameJava または Scala プログラムのエントリクラス。このパラメーターは Python プログラムには必要ありません。
spark.adb.eni.enabledENI を有効にするかどうかを指定します。企業版、基礎版、または Data Lakehouse Edition Spark を使用して Kafka にアクセスする場合は、ENI を有効にする必要があります。
spark.adb.eni.vswitchIdKafka インスタンスの vSwitch ID。
spark.adb.eni.securityGroupIdKafka インスタンスが追加されているセキュリティグループの ID。
confSpark ジョブに必要な構成パラメーター。Apache Spark のパラメーターと似ています。パラメーターは
key:value形式である必要があります。複数のパラメーターはカンマ(,)で区切ります。Apache Spark のパラメーターとは異なる構成パラメーター、または AnalyticDB for MySQL 固有の構成パラメーターの詳細については、「Spark アプリケーションの構成パラメーター」をご参照ください。実行 をクリックします。