AnalyticDB for MySQL Sparkを使用すると、OSS-HDFSにアクセスできます。 このトピックでは、Sparkを使用してOSS-HDFSにアクセスする方法について説明します。
前提条件
AnalyticDB for MySQL Data Lakehouse Editionクラスターが作成されます。
Object Storage Service (OSS) バケットは、AnalyticDB for MySQLクラスターと同じリージョンに作成されます。
AnalyticDB for MySQLクラスターのジョブリソースグループが作成されます。 詳細については、「リソースグループの作成」をご参照ください。
AnalyticDB for MySQLクラスター用のデータベースアカウントが作成されます。
Alibaba Cloudアカウントを使用する場合は、特権アカウントを作成する必要があります。 詳細については、「データベースアカウントの作成」トピックの「特権アカウントの作成」セクションをご参照ください。
RAM (Resource Access Management) ユーザーを使用する場合は、特権アカウントと標準アカウントの両方を作成し、標準アカウントをRAMユーザーに関連付ける必要があります。 詳細については、「データベースアカウントの作成」および「データベースアカウントの関連付けまたは関連付けの解除」をご参照ください。
OSS-HDFSは有効です。 詳細については、「OSS-HDFSの有効化」をご参照ください。
Spark JARモードでのOSS-HDFSデータの読み書き
OSS-HDFSへのアクセスに使用するプログラムを作成します。 次に、Sparkジョブに必要なJARパッケージにプログラムをコンパイルします。 この例では、JARパッケージの名前は
oss_hdfs_demo.jar
です。 サンプルコード:package com.aliyun.spark import org.apache.spark.sql.SparkSession object SparkHDFS { def main(args: Array[String]): Unit = { val sparkSession = SparkSession .builder() .appName("Spark HDFS TEST") .getOrCreate() val welcome = "hello, adb-spark" // Specify the Hadoop Distributed File System (HDFS) directory to store required data. val hdfsPath = args(0); // Store the welcome string to the specified HDFS directory. sparkSession.sparkContext.parallelize(Seq(welcome)).saveAsTextFile(hdfsPath) // Read data from the specified HDFS directory and display the data. sparkSession.sparkContext.textFile(hdfsPath).collect.foreach(println) } }
oss_hdfs_demo.jar
パッケージをOSS-HDFSにアップロードします。 詳細については、「Hadoop Shellコマンドを使用したOSS-HDFSへのアクセス」をご参照ください。AnalyticDB for MySQL コンソールにログインします。 ホームページの左上でリージョンを選択します。 左側のナビゲーションウィンドウで、クラスターリスト をクリックします。 [Data Lakehouse Edition] タブで、管理するクラスターを見つけ、クラスターIDをクリックします。
左側のナビゲーションウィンドウで、
を選択します。エディターの上部で、ジョブリソースグループとSparkアプリケーションタイプを選択します。 この例では、バッチタイプが選択されています。
{ "args": ["oss://testBucketName/data/oss_hdfs"], "file": "oss://testBucketName/data/oss_hdfs_demo.jar", "name": "spark-on-hdfs", "className": "com.aliyun.spark.SparkHDFS", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 1, "spark.executor.resourceSpec": "medium", "spark.adb.connectors": "jindo" } }
以下のセクションで、パラメーターについて説明します。
パラメーター
説明
args
Spark JARジョブの実行に必要な引数。 この例では、argsパラメーターでOSS-HDFSパスを指定する必要があります。
例:
oss:// testBucketName/data/oss_hdfs
file
JARパッケージのOSS-HDFSパス。
例:
oss:// testBucketName/data/oss_hdfs_demo.jar
name
Sparkアプリケーションの名前。
spark.adb.connectors
OSS-HDFSデータの読み取りに使用されるコネクタ。 この例では、
jindo
が使用されます。conf
Sparkアプリケーションに必要な設定パラメーター。Apache Sparkのものと同様です。 パラメーターは
key:value
形式である必要があります。 複数のパラメーターはコンマ (,) で区切ります。 Apache Sparkの設定パラメーターとは異なる設定パラメーター、またはAnalyticDB For MySQLに固有の設定パラメーターについては、「Sparkアプリケーション設定パラメーター」をご参照ください。実行 をクリックします。 Sparkコードを実行した後、Spark JAR開発ページの [アプリケーション] タブの [操作] 列で [ログ] をクリックしてログ情報を表示できます。 詳細については、「Sparkエディター」をご参照ください。
Spark SQLモードでのOSS-HDFSデータの読み書き
OSS-HDFS上にデータベースパスとテーブルパスを作成します。 詳細については、「Hadoop Shellコマンドを使用したOSS-HDFSへのアクセス」をご参照ください。 この例では、次のパスが作成されます。
データベースパス:
oss://{bucket}/jindo_test
テーブルパス:oss://{bucket}/jindo_test/tbl
OSS-HDFSへのアクセスに使用されるSpark SQL文を記述します。
SET spark.driver.resourceSpec=small; SET spark.executor.instances=1; SET spark.executor.resourceSpec=small; SET spark.adb.connectors=jindo; CREATE DATABASE IF NOT EXISTS jindo_test LOCATION 'oss://{bucket}/jindo_test'; USE jindo_test; CREATE TABLE IF NOT EXISTS tbl(id int, name string) LOCATION 'oss://{bucket}/jindo_test/tbl'; INSERT INTO tbl values(1, 'aaa'); SELECT * FROM tbl;
[今すぐ実行] をクリックします。
関連ドキュメント
Hudi外部テーブルの読み書き方法については、「Spark SQLを使用したHudi外部テーブルの読み書き」をご参照ください。
Delta外部テーブルの読み書き方法については、「Spark SQLを使用したDelta外部テーブルの読み書き」をご参照ください。