すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Spark を使用して Iceberg テーブルにデータを書き込み、バッチモードでテーブルからデータを読み取る

最終更新日:Jan 11, 2025

このトピックでは、Spark DataFrame API を使用して Iceberg テーブルにデータを書き込み、バッチモードでテーブルからデータを読み取る方法について説明します。このトピックでは、Spark 3.x を使用します。

前提条件

EMR Hadoop クラスタが作成されていること。詳細については、クラスタの作成をご参照ください。
説明 このトピックは、EMR V3.38.0、EMR V5.4.0 以降のマイナーバージョンの Hadoop クラスタにのみ適用されます。

手順

  1. Maven プロジェクトを作成し、プロジェクトのプロジェクトオブジェクトモデル (POM) ファイルに依存関係を追加します。

    Spark の依存関係と Iceberg の依存関係を POM ファイルに追加します。次のコードでは、Spark 3.1.1 の依存関係と Iceberg 0.12.0 の依存関係が追加されています。コードは、指定された依存関係スコープを使用してコンパイルされます。E-MapReduce (EMR) クラスタで実行されている Iceberg ソフトウェアパッケージを使用することをお勧めします。

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.1.1</version>
        <scope>provided</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.1.1</version>
        <scope>provided</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.iceberg</groupId>
        <artifactId>iceberg-core</artifactId>
        <version>0.12.0</version>
        <scope>provided</scope>
    </dependency>
    説明

    EMR クラスタの Iceberg ソフトウェアパッケージは、オープンソースの Iceberg 依存関係パッケージとは異なります。たとえば、Data Lake Formation (DLF) カタログは、EMR クラスタの Iceberg ソフトウェアパッケージと自動的に統合されます。オンプレミスのマシンでコードをコンパイルし、コードをパッケージ化し、クラスタ環境の依存関係を使用してコードを実行するために、provided 依存関係スコープを使用してオープンソースの Iceberg 依存関係を追加することをお勧めします。

  2. カタログを設定します。

    Spark API を呼び出して Iceberg テーブルに対する操作を実行する前に、関連する SparkConf オブジェクトに必要な構成項目を追加してカタログを設定します。

    • EMR V3.40 以降のマイナーバージョン、および EMR V5.6.0 以降

      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.iceberg.catalog-impl", "org.apache.iceberg.aliyun.dlf.hive.DlfCatalog")
    • EMR V3.39.X および EMR V5.5.X

      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf.catalog-impl", "org.apache.iceberg.aliyun.dlf.hive.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf.warehouse", "<yourOSSWarehousePath>")
    • EMR V3.38.X、EMR V5.3.X、および EMR V5.4.X

      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
      sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
      説明

      サンプルコードを実行する前に、環境変数を設定する必要があります。環境変数の設定方法の詳細については、このトピックの環境変数の設定セクションをご参照ください。

  3. Iceberg テーブルにデータを書き込みます。

    Spark 3.x を使用している場合、DataFrameWriterV2 API を使用して Iceberg テーブルにデータを書き込むことができます。DataFrameWriterV1 API は使用しないことをお勧めします。次のサンプルコードは、DataFrameWriterV2 API を使用して Iceberg テーブルにデータを書き込む方法の例を示しています。

    次の例では、<yourCatalogName> はカタログの名前を指定します。<yourCatalogName> を実際のカタログ名に置き換えることができます。

    次のコードを実行してデータテーブルを作成します。

    val df: DataFrame = ...
    df.writeTo("<yourCatalogName>.iceberg_db.sample").create()
    説明

    create、replace、または createOrReplace コマンドを実行してデータテーブルを作成できます。また、tableProperty メソッドを使用してデータテーブルのプロパティを設定し、partitionedBy メソッドを使用してデータテーブルのパーティションフィールドを設定することもできます。

    次のコマンドを実行して、データを追加および上書きできます。

    • 追加モードでテーブルにデータを書き込む

      val df: DataFrame = ...
      df.writeTo("<yourCatalogName>.iceberg_db.sample").append()
    • 上書きモードでテーブルにデータを書き込む

      val df: DataFrame = ...
      df.writeTo("<yourCatalogName>.iceberg_db.sample").overwritePartitions()
  4. テーブルからデータを読み取ります。

    使用している Spark のバージョンに基づいて、データ読み取り方法を選択します。

    • Spark 3.x (推奨)

       val df = spark.table("<yourCatalogName>.iceberg_db.sample")
    • Spark 2.4

      val df = spark.read.format("iceberg").load("<yourCatalogName>.iceberg_db.sample")

この例では、Spark DataFrame API を呼び出して Iceberg テーブルにデータを書き込み、バッチモードでテーブルからデータを読み取ります。

重要

カタログのパラメータとデフォルト名は、クラスタのバージョンによって異なります。この例では、DLF を使用してメタデータを管理します。この例では、EMR V5.3.0 クラスタと dlf_catalog という名前のカタログを使用します。詳細については、DLF メタデータの構成をご参照ください。

  1. Spark SQL を使用して、テスト用に iceberg_db という名前のデータベースを作成します。詳細については、Iceberg の使用をご参照ください。

  2. Spark コードを記述します。

    Scala のサンプルコード:

    def main(args: Array[String]): Unit = {
    
      // カタログのパラメータを設定します。
      val sparkConf = new SparkConf()
      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
      sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
    
      val spark = SparkSession
      .builder()
      .config(sparkConf)
      .appName("IcebergReadWriteTest")
      .getOrCreate()
    
      // DataFrame に Iceberg テーブルを作成または置換します。
      val firstDF = spark.createDataFrame(Seq(
      (1, "a"), (2, "b"), (3, "c")
      )).toDF("id", "data")
    
      firstDF.writeTo("dlf_catalog.iceberg_db.sample").createOrReplace()
    
      // DataFrame から Iceberg テーブルにデータを書き込みます。
      val secondDF = spark.createDataFrame(Seq(
      (4, "d"), (5, "e"), (6, "f")
      )).toDF("id", "data")
    
      secondDF.writeTo("dlf_catalog.iceberg_db.sample").append()
    
      // Iceberg テーブルからデータを読み取ります。
      val icebergTable = spark.table("dlf_catalog.iceberg_db.sample")
    
      icebergTable.show()
    }
  3. コードをパッケージ化し、EMR クラスタにデプロイします。

    1. Scala でコードをコンパイルするために使用される Maven プラグインを確認します。pom.xml ファイルで次のプラグインを設定できます。

      <build>
          <plugins>
              <!-- Maven Scala プラグインは Scala ソースファイルをコンパイルします -->
              <plugin>
                  <groupId>net.alchim31.maven</groupId>
                  <artifactId>scala-maven-plugin</artifactId>
                  <version>3.2.2</version>
                  <executions>
                      <execution>
                          <goals>
                              <goal>compile</goal>
                              <goal>testCompile</goal>
                          </goals>
                      </execution>
                  </executions>
              </plugin>
          </plugins>
      </build>
    2. オンプレミスのマシンでコードをデバッグし、次のコマンドを実行してコードをパッケージ化します。

      mvn clean install
    3. SSH モードで EMR クラスタにログオンします。詳細については、クラスタへのログオンをご参照ください。

    4. JAR パッケージを EMR クラスタにアップロードします。

      この例では、JAR パッケージは EMR クラスタのルートディレクトリにアップロードされます。

  4. spark-submit コマンドを実行して Spark ジョブを送信します。

    spark-submit \
     --master yarn \
     --deploy-mode cluster \
     --driver-memory 1g \
     --executor-cores 1 \
     --executor-memory 1g \
     --num-executors 1 \
     --class com.aliyun.iceberg.IcebergTest \
     iceberg-demos.jar
    説明

    この例では、iceberg-demos.jar という名前の JAR パッケージが使用されています。--class パラメータの値と JAR パッケージの名前は、ビジネス要件に基づいて変更できます。

    次の結果が返されます。

    +---+----+
    | id|data|
    +---+----+
    |  4|   d|
    |  1|   a|
    |  5|   e|
    |  6|   f|
    |  2|   b|
    |  3|   c|
    +---+----+