このトピックでは、AnalyticDB for MySQL Data Lakehouse Edition (V3.0) Sparkを使用して、elastic network interface (ENI) 経由でMessage Queue for Apache Kafkaにアクセスする方法について説明します。
前提条件
AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターが作成されます。 詳細については、「クラスターの作成」をご参照ください。
ジョブリソースグループが作成されます。 詳細については、「リソースグループの作成」をご参照ください。
データベースアカウントが作成されます。
Alibaba Cloudアカウントを使用する場合は、特権データベースアカウントを作成するだけで済みます。 詳細については、「特権アカウントの作成」をご参照ください。
RAM (Resource Access Management) ユーザーを使用する場合は、特権データベースアカウントと標準データベースアカウントの両方を作成し、標準アカウントをRAMユーザーに関連付ける必要があります。 詳細については、「データベースアカウントの作成」および「データベースアカウントの関連付けまたは関連付けの解除」をご参照ください。
Apache KafkaインスタンスのMessage Queueは、AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターと同じリージョンに作成されます。 詳細については、「インターネットおよびVPCに接続されたインスタンスの購入とデプロイ」をご参照ください。
トピックとコンシューマーグループがKafkaインスタンスに作成されます。 詳細については、「リソースの作成」 をご参照ください。
Object Storage Service (OSS) が有効化され、AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターと同じリージョンにバケットが作成されます。 詳細については、「OSSの有効化」および「バケットの作成」をご参照ください。
準備
ApsaraMQ for Kafkaコンソールにログインし、[インスタンスの詳細] ページに移動して、KafkaインスタンスのvSwitch IDを取得します。
Elastic Compute Service (ECS) コンソールにログインし、[セキュリティグループ] ページに移動して、Kafkaインスタンスが追加されたセキュリティグループのIDを取得します。
ApsaraMQ for Kafkaコンソールにログインし、ホワイトリスト管理ページに移動して、vSwitchのCIDRブロックがKafkaインスタンスのホワイトリストに追加されているかどうかを確認します。
手順
KafkaインスタンスとAnalyticDB for MySQL Sparkアプリケーションのバージョンにそれぞれ対応するJARパッケージをダウンロードします。 詳細については、「Kafka-clients」および「Spark-sql-kafka-0-10」をご参照ください。
次の依存関係をpom.xmlファイルに追加します。
<!- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.12</artifactId> <version>3.2.2</version> <scope> テスト </scope> </dependency> <!- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.1</version> </dependency>
Spark Streaming
という名前のプログラムを作成してパッケージ化します。 この例では、生成されたパッケージの名前はspark-example.jar
です。 サンプルコード:パッケージcom.aliyun.spark.streaming org.apache.kafka.clients.consumer.ConsumerRecordをインポートする org.apache.spark.SparkConfをインポートする org.apache.spark.sql.SparkSessionをインポートする オブジェクトSparkKafka { def main(args: Array[String]): Unit = { if(args.length < 3){ System.err.println( """ | args0: groupId | args1: topicName | args2: bootstrapServers | "" ".stripMargin) System.exit(1) } val groupId = args (0) val topicName = args (1) val bootstrapServers = args (2) val sparkConf: SparkConf = new SparkConf() . set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") . setAppName("SparkKafkaSub") sparkConf.registerKryoClasses (配列 (classOf[ConsumerRecord[_,_]])) val sparkSession = SparkSession .builder() . config(sparkConf) .getOrCreate() val df = sparkSession .readStream . フォーマット ("kafka") // Kafkaインスタンスのエンドポイント。 . オプション ("kafka.bootstrap.servers" 、alikafka-pre-cn-x0r34a20 ****-1-vpc.alikafka.aliyuncs.com:9092、alikafka-pre-cn-x0r34a20 ****-2-vpc.alikafka.aliyuncs.com:9092、alikafka-pre-cn-x0r34a20 ****-3-vpc.alikafka.aliyuncs.com:9092) // AnalyticDB for MySQLクラスターで使用するトピックの名前。 . オプション ("subscribe", kafka_test) // トピックを消費する消費者グループのID。 . オプション ("group.id", kafka_groupId) .load() val query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") . writeStream .outputMode("append") . 形式 (「コンソール」) .start() ssc.awaitTermination() } }
ダウンロードしたJARパッケージと
Spark Streaming
プログラムをOSSにアップロードします。 ファイルのアップロード方法の詳細については、「オブジェクトのアップロード」をご参照ください。Sparkエディターに移動します。
AnalyticDB for MySQLコンソールにログインします。
ページの左上隅で、クラスターが存在するリージョンを選択します。
左側のナビゲーションウィンドウで、クラスターリスト をクリックします。
Lake Warehouse Edition(3.0) タブでクラスターを見つけ、クラスター ID をクリックします。
左側のナビゲーションウィンドウで、 を選択します。
Sparkジョブのジョブリソースグループとジョブタイプを選択します。 この例では、バッチタイプが選択されています。
Sparkエディターで次のコードを実行します。
{ "args": [ "kafka_groupId" 、 "kafka_test" 、 「alikafka-pre-cn-x0r34a20 **** 1-vpc.alikafka.aliyuncs.com:9092、alikafka-pre-cn-x0r34a20 **** 2-vpc.alikafka.aliyuncs.com:9092、alikafka-pre-cn-x0r34a20 **** 3-vpc.alikafka.aliyuncs.com:9092」 ], "file": "oss://<bucket_name>/spark-example.jar" 、 "jars": "oss://<bucket_name>/kafka-clients-2.8.1.jar、oss://<bucket_name>/spark-sql-kafka-0-10_2.12-3.2.0.jar" 、 "name": "Kafka Example" 、 "className": "com.aliyun.spark.streaming.SparkKafka" 、 "conf": { "spark.driver.resourceSpec": "small" 、 "spark.exe cutor.instances": 1、 "spark.exe cutor.resourceSpec": "small" 、 "spark.adb.eni.enabled": "true" 、 "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y ****" 、 "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx ****" } }
下表に、各パラメーターを説明します。
パラメーター
説明
args
Sparkジョブに渡される引数。 複数の引数はコンマ (,) で区切ります。
ファイル
Sparkジョブのメインファイルのパス。 メインファイルは、エントリクラスを含むJARパッケージ、またはPythonプログラムのエントリポイントとして機能する実行可能ファイルです。
説明SparkジョブのメインファイルはOSSに保存する必要があります。
瓶
Sparkジョブに必要なJARパッケージ。 複数のJARパッケージをコンマ (,) で区切ります。
名前
Sparkジョブの名前。
className
JavaまたはScalaプログラムのエントリクラス。 このパラメーターは、Pythonプログラムには必要ありません。
spark.adb.eni.enabled
ENIを有効にするかどうかを指定します。 Data Lakehouse Edition (V3.0) Sparkを使用してKafkaにアクセスする場合は、ENIを有効にする必要があります。
spark.adb.eni.vswitchId
KafkaインスタンスのvSwitch ID。
spark.adb.eni.securityGroupId
Kafkaインスタンスが追加されたセキュリティグループのID。
conf
Sparkジョブに必要な設定パラメーター (Apache Sparkと同様) 。 パラメーターは
key:value
形式である必要があります。 複数のパラメーターはコンマ (,) で区切ります。 Apache Sparkの設定パラメーターとは異なる設定パラメーター、またはAnalyticDB For MySQLに固有の設定パラメーターの詳細については、「Sparkアプリケーション設定パラメーター」をご参照ください。実行 をクリックします。