このトピックでは、Spark DataFrame API を使用して Iceberg テーブルにデータを書き込み、バッチモードでテーブルからデータを読み取る方法について説明します。このトピックでは、Spark 3.x を使用します。
前提条件
手順
Maven プロジェクトを作成し、プロジェクトのプロジェクトオブジェクトモデル (POM) ファイルに依存関係を追加します。
Spark の依存関係と Iceberg の依存関係を POM ファイルに追加します。次のコードでは、Spark 3.1.1 の依存関係と Iceberg 0.12.0 の依存関係が追加されています。コードは、指定された依存関係スコープを使用してコンパイルされます。E-MapReduce (EMR) クラスタで実行されている Iceberg ソフトウェアパッケージを使用することをお勧めします。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.1.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.1.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-core</artifactId> <version>0.12.0</version> <scope>provided</scope> </dependency>説明EMR クラスタの Iceberg ソフトウェアパッケージは、オープンソースの Iceberg 依存関係パッケージとは異なります。たとえば、Data Lake Formation (DLF) カタログは、EMR クラスタの Iceberg ソフトウェアパッケージと自動的に統合されます。オンプレミスのマシンでコードをコンパイルし、コードをパッケージ化し、クラスタ環境の依存関係を使用してコードを実行するために、provided 依存関係スコープを使用してオープンソースの Iceberg 依存関係を追加することをお勧めします。
カタログを設定します。
Spark API を呼び出して Iceberg テーブルに対する操作を実行する前に、関連する SparkConf オブジェクトに必要な構成項目を追加してカタログを設定します。
EMR V3.40 以降のマイナーバージョン、および EMR V5.6.0 以降
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.iceberg.catalog-impl", "org.apache.iceberg.aliyun.dlf.hive.DlfCatalog")EMR V3.39.X および EMR V5.5.X
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.dlf", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.dlf.catalog-impl", "org.apache.iceberg.aliyun.dlf.hive.DlfCatalog") sparkConf.set("spark.sql.catalog.dlf.warehouse", "<yourOSSWarehousePath>")EMR V3.38.X、EMR V5.3.X、および EMR V5.4.X
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO") sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>") sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")) sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")説明サンプルコードを実行する前に、環境変数を設定する必要があります。環境変数の設定方法の詳細については、このトピックの環境変数の設定セクションをご参照ください。
Iceberg テーブルにデータを書き込みます。
Spark 3.x を使用している場合、DataFrameWriterV2 API を使用して Iceberg テーブルにデータを書き込むことができます。DataFrameWriterV1 API は使用しないことをお勧めします。次のサンプルコードは、DataFrameWriterV2 API を使用して Iceberg テーブルにデータを書き込む方法の例を示しています。
次の例では、
<yourCatalogName>はカタログの名前を指定します。<yourCatalogName> を実際のカタログ名に置き換えることができます。次のコードを実行してデータテーブルを作成します。
val df: DataFrame = ... df.writeTo("<yourCatalogName>.iceberg_db.sample").create()説明create、replace、または createOrReplace コマンドを実行してデータテーブルを作成できます。また、tableProperty メソッドを使用してデータテーブルのプロパティを設定し、partitionedBy メソッドを使用してデータテーブルのパーティションフィールドを設定することもできます。
次のコマンドを実行して、データを追加および上書きできます。
追加モードでテーブルにデータを書き込む
val df: DataFrame = ... df.writeTo("<yourCatalogName>.iceberg_db.sample").append()上書きモードでテーブルにデータを書き込む
val df: DataFrame = ... df.writeTo("<yourCatalogName>.iceberg_db.sample").overwritePartitions()
テーブルからデータを読み取ります。
使用している Spark のバージョンに基づいて、データ読み取り方法を選択します。
Spark 3.x (推奨)
val df = spark.table("<yourCatalogName>.iceberg_db.sample")Spark 2.4
val df = spark.read.format("iceberg").load("<yourCatalogName>.iceberg_db.sample")
例
この例では、Spark DataFrame API を呼び出して Iceberg テーブルにデータを書き込み、バッチモードでテーブルからデータを読み取ります。
カタログのパラメータとデフォルト名は、クラスタのバージョンによって異なります。この例では、DLF を使用してメタデータを管理します。この例では、EMR V5.3.0 クラスタと dlf_catalog という名前のカタログを使用します。詳細については、DLF メタデータの構成をご参照ください。
Spark SQL を使用して、テスト用に iceberg_db という名前のデータベースを作成します。詳細については、Iceberg の使用をご参照ください。
Spark コードを記述します。
Scala のサンプルコード:
def main(args: Array[String]): Unit = { // カタログのパラメータを設定します。 val sparkConf = new SparkConf() sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO") sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>") sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")) sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>") val spark = SparkSession .builder() .config(sparkConf) .appName("IcebergReadWriteTest") .getOrCreate() // DataFrame に Iceberg テーブルを作成または置換します。 val firstDF = spark.createDataFrame(Seq( (1, "a"), (2, "b"), (3, "c") )).toDF("id", "data") firstDF.writeTo("dlf_catalog.iceberg_db.sample").createOrReplace() // DataFrame から Iceberg テーブルにデータを書き込みます。 val secondDF = spark.createDataFrame(Seq( (4, "d"), (5, "e"), (6, "f") )).toDF("id", "data") secondDF.writeTo("dlf_catalog.iceberg_db.sample").append() // Iceberg テーブルからデータを読み取ります。 val icebergTable = spark.table("dlf_catalog.iceberg_db.sample") icebergTable.show() }コードをパッケージ化し、EMR クラスタにデプロイします。
Scala でコードをコンパイルするために使用される Maven プラグインを確認します。pom.xml ファイルで次のプラグインを設定できます。
<build> <plugins> <!-- Maven Scala プラグインは Scala ソースファイルをコンパイルします --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>オンプレミスのマシンでコードをデバッグし、次のコマンドを実行してコードをパッケージ化します。
mvn clean installSSH モードで EMR クラスタにログオンします。詳細については、クラスタへのログオンをご参照ください。
JAR パッケージを EMR クラスタにアップロードします。
この例では、JAR パッケージは EMR クラスタのルートディレクトリにアップロードされます。
spark-submit コマンドを実行して Spark ジョブを送信します。
spark-submit \ --master yarn \ --deploy-mode cluster \ --driver-memory 1g \ --executor-cores 1 \ --executor-memory 1g \ --num-executors 1 \ --class com.aliyun.iceberg.IcebergTest \ iceberg-demos.jar説明この例では、iceberg-demos.jar という名前の JAR パッケージが使用されています。--class パラメータの値と JAR パッケージの名前は、ビジネス要件に基づいて変更できます。
次の結果が返されます。
+---+----+ | id|data| +---+----+ | 4| d| | 1| a| | 5| e| | 6| f| | 2| b| | 3| c| +---+----+