全部產品
Search
文件中心

E-MapReduce:基礎使用

更新時間:Jul 26, 2024

本文通過樣本為您介紹如何在已經建立好的E-MapReduce(簡稱EMR)叢集中使用Iceberg。

背景資訊

本文以資料湖中繼資料為例,詳細配置請參見資料湖中繼資料配置

前提條件

已在E-MapReduce控制台上,建立Hadoop的EMR-5.3.0及後續版本的叢集,詳情請參見建立叢集

使用限制

由於Iceberg的Spark SQL Extensions不適用於Spark 2.4,因此對於EMR-3.38.x及後續版本的Spark只能使用DataFrame API操作Iceberg。本文介紹EMR-5.3.0及後續版本以Spark SQL方式操作Iceberg。

操作步驟

  1. 使用SSH方式登入到叢集,詳情資訊請參見登入叢集

  2. 執行以下命令,通過Spark SQL讀寫Iceberg配置。

    在Spark SQL中操作Iceberg,首先需要配置Catalog。以下是在Spark SQL中使用資料湖中繼資料的配置,叢集版本不同預設的Catalog名稱不同,需要配置的參數也不同,具體請參見資料湖中繼資料配置

    說明

    Catalog的配置以spark.sql.catalog.<catalog_name>作為首碼,其中<catalog_name>為Catalog名稱。

    • EMR-5.6.0及後續版本

      spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.iceberg=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.iceberg.catalog-impl=org.apache.iceberg.aliyun.dlf.hive.DlfCatalog \
    • EMR-5.5.x版本

      spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.dlf=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.dlf.catalog-impl=org.apache.iceberg.aliyun.dlf.hive.DlfCatalog \
       --conf spark.sql.catalog.dlf.warehouse=<yourOSSWarehousePath> \
      說明

      spark.sql.catalog.dlf.warehouse參數可以不設定。如果不設定spark.sql.catalog.dlf.warehouse,則使用預設的warehouse路徑。

    • EMR-5.3.x~EMR-5.4.x版本(包含)

      這些版本需要配置AccessKey資訊。請確保代碼運行環境設定了環境變數ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具體配置方法,請參見在Linux、macOS和Windows系統配置環境變數

      spark-sql --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
       --conf spark.sql.catalog.dlf_catalog=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.dlf_catalog.catalog-impl=org.apache.iceberg.aliyun.dlf.DlfCatalog \
       --conf spark.sql.catalog.dlf_catalog.io-impl=org.apache.iceberg.hadoop.HadoopFileIO \
       --conf spark.sql.catalog.dlf_catalog.oss.endpoint=<yourOSSEndpoint> \
       --conf spark.sql.catalog.dlf_catalog.warehouse=<yourOSSWarehousePath> \
       --conf spark.sql.catalog.dlf_catalog.access.key.id=<ALIBABA_CLOUD_ACCESS_KEY_ID> \
       --conf spark.sql.catalog.dlf_catalog.access.key.secret=<ALIBABA_CLOUD_ACCESS_KEY_SECRET> \
       --conf spark.sql.catalog.dlf_catalog.dlf.catalog-id=<yourCatalogId> \
       --conf spark.sql.catalog.dlf_catalog.dlf.endpoint=<yourDLFEndpoint> \
       --conf spark.sql.catalog.dlf_catalog.dlf.region-id=<yourDLFRegionId>

    當返回資訊中包含如下資訊時,表示已進入spark-sql命令列。

    spark-sql>
  3. 基礎操作。

    重要

    以下樣本中的<catalog_name>為您Catalog的名稱,例如,EMR-5.6.0及後續版本的Catalog的名稱為iceberg,其餘版本請參見步驟2中配置Catalog時的資訊,配置都是以spark.sql.catalog.<catalog_name>作為首碼的。

    • 建立庫

      CREATE DATABASE IF NOT EXISTS <catalog_name>.iceberg_db;
    • 建立表

      CREATE TABLE IF NOT EXISTS <catalog_name>.iceberg_db.sample(
          id BIGINT COMMENT 'unique id', 
          data STRING
      ) 
      USING iceberg;

      Iceberg表支援COMMENT、PARTITIONED BY、LOCATION和TBLPROPERTIES等文法。如果通過TBLPROPERTIES設定表層級屬性,程式碼範例如下。

      CREATE TABLE IF NOT EXISTS <catalog_name>.iceberg_db.sample(
          id BIGINT COMMENT 'unique id', 
          data STRING
      ) 
      USING iceberg
      TBLPROPERTIES (
          'write.format.default'='parquet'
      );
    • 寫入資料

      INSERT INTO <catalog_name>.iceberg_db.sample VALUES (1, 'a'), (2, 'b'), (3, 'c');
    • 查詢資料

      SELECT * FROM <catalog_name>.iceberg_db.sample;
      SELECT count(1) AS count, data FROM <catalog_name>.iceberg_db.sample GROUP BY data;
    • 更新資料

      UPDATE <catalog_name>.iceberg_db.sample SET data = 'x' WHERE id = 3;
    • 刪除資料

      DELETE FROM <catalog_name>.iceberg_db.sample WHERE id = 3;