全部產品
Search
文件中心

E-MapReduce:通過資料湖中繼資料DLF讀寫Hudi

更新時間:Jul 01, 2024

EMR-3.38.3及後續版本的DataFlow叢集,可以通過資料湖中繼資料DLF(Data Lake Formation)作為中繼資料讀取DataLake叢集或自訂叢集中的資料。本文為您介紹Dataflow叢集如何串連DLF,並讀取Hudi全量資料。

前提條件

  • 已在E-MapReduce控制台上建立DataFlow叢集和DataLake叢集,詳情請參見建立叢集
    重要 建立DataLake叢集時,中繼資料需為DLF統一中繼資料
  • 已開通資料湖構建DLF,詳情請參見快速入門

使用限制

DataFlow叢集和DataLake叢集需要在同一VPC下。

操作流程

  1. 步驟一:環境準備
  2. 步驟二:啟動Flink SQL
  3. 步驟三:建立並驗證Catalog
  4. 步驟四:Flink SQL寫入Hudi
  5. 步驟五:DataLake叢集查詢Hudi

步驟一:環境準備

拷貝DataLake叢集中${HIVE_CONF_DIR}下的hive-site.xml到DataFlow叢集。

例如,${HIVE_CONF_DIR}/etc/taihao-apps/hive-conf/

mkdir /etc/taihao-apps/hive-conf
scp root@<master-1-1節點內網的IP地址>:/etc/taihao-apps/hive-conf/hive-site.xml /etc/taihao-apps/hive-conf/

步驟二:啟動Flink SQL

重要
  • 務必將DLF的依賴包放置在Hive依賴包的前面,其中DLF依賴包中嵌入了Hudi的依賴。
  • 無需關注Datalake叢集中的Hive版本,Hive依賴均使用2.3.6版本的。
執行以下命令,啟動Flink SQL。
sql-client.sh \
-j /opt/apps/FLINK/flink-current/opt/catalogs/dlf/ververica-connector-dlf-1.13-vvr-4.0.15-SNAPSHOT-jar-with-dependencies.jar \
-j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.13-vvr-4.0.15-SNAPSHOT-jar-with-dependencies.jar
測試時可設定以下配置。
set sql-client.verbose=true;
set sql-client.execution.result-mode=tableau;
set execution.checkpointing.interval=1000;

步驟三:建立並驗證Catalog

進入Flink SQL後,分別建立DLF Catalog和Hive Catalog用於讀取Hudi表和Hive表。

  1. 執行以下命令,建立Catalog。
    • 建立DLF Catalog
      CREATE CATALOG dlf_catalog WITH (
           'type' = 'dlf',
           'access.key.id' = '<yourAccessKeyId>', --您阿里雲帳號的AccessKey ID。
           'access.key.secret' = '<yourAccessKeyId>', --您阿里雲帳號的AccessKey Secret。
           'warehouse' = 'oss://oss-bucket/warehouse/test.db',
           'oss.endpoint' = '<oss.endpoint>', --從${HADOOP_CONF_DIR}/core-site.xml中擷取。
           'dlf.endpoint' = '<dlf.endpoint>', --從${HIVE_CONF_DIR}/hive-site.xml中擷取。
           'dlf.region-id' = '<dlf.region-id>' --從${HIVE_CONF_DIR}/hive-site.xml中擷取。
       );
    • 建立Hive Catalog
      重要 無需關注Datalake叢集中的Hive版本,hive-version均使用2.3.6。
      CREATE CATALOG hive_catalog WITH (
           'type' = 'hive',
           'default-database' = 'default',
           'hive-version' = '2.3.6',
           'hive-conf-dir' = '/etc/taihao-apps/hive-conf/',
           'hadoop-conf-dir' = '/etc/taihao-apps/hadoop-conf/'
       );
    Catalog建立成功後,均會返回以下資訊。
    [INFO] Execute statement succeed.
  2. 執行以下命令,驗證Catalog。
    • 驗證DLF Catalog
      select * from dlf_catalog.test.hudi_table;
    • 驗證Hive Catalog
      select * from hive_catalog.test.hive_table;

步驟四:Flink SQL寫入Hudi

  • 情境一:資料入湖
    使用Datagen Connector隨機產生上遊Source資料,入湖Hudi表。
    -- 構建上遊Source資料
    CREATE TABLE datagen_source (
      uuid int,
      age int,
      ts bigint,
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10'
    );
    
    -- 建立Hudi表
    CREATE TABLE dlf_catalog.test.hudi_tbl1(
      id int not null,
      age int,
      ts bigint
    )
    with(
      'connector'='hudi',
      'path' = 'oss://oss-bucket/warehouse/test.db/hudi_tbl1',
      'table.type'='COPY_ON_WRITE',
      'hoodie.datasource.write.recordkey.field'='id',
      'hive_sync.enable'='true',
      'hive_sync.table'='hudi_tbl1',    -- required, Hive建立的表名。
      'hive_sync.db'='test',            -- required, Hive建立的資料庫名。
      'hive_sync.mode' = 'hms'          -- required, 將hive sync mode設定為hms, 預設jdbc。
    );
    
    --入湖
    insert into dlf_catalog.test.hudi_tbl1
    select uuid as id, age, ts
    from default_catalog.default_database.datagen_source;
    
    -- 查詢驗證
    select * from dlf_catalog.test.hudi_tbl1;
  • 情境二:維度打寬入湖

    使用ODS層的Hudi資料和Hive的維度資料表關聯打寬填充維度欄位,最後寫入新的Hudi表。

    例如,已有表dlf_catalog.test.hive_dim_tbl,表結構如下。
    id                      int
    name                    string
    詳細樣本如下。
    -- 建立目標Hudi表
    CREATE TABLE dlf_catalog.test.hudi_tbl2(
      id int not null,
      name string,
      age int,
      ts bigint
    )
    with(
      'connector'='hudi',
      'path' = 'oss://oss-bucket/warehouse/test.db/hudi_tbl2',
      'table.type'='COPY_ON_WRITE',
      'hoodie.datasource.write.recordkey.field'='id',
      'hive_sync.enable'='true',
      'hive_sync.table'='hudi_tbl2',    -- required, Hive建立的表名。
      'hive_sync.db'='test',            -- required, Hive建立的資料庫名。
      'hive_sync.mode' = 'hms'          -- required, 將hive sync mode設定為hms, 預設jdbc。
    );
    
    -- 關聯Hive維度資料表後入湖
    insert into dlf_catalog.test.hudi_tbl2
    select s.id, name, age, ts
    from dlf_catalog.test.hudi_tbl1 s join hive_catalog.test.hive_dim_tbl t on s.id = t.id;

步驟五:DataLake叢集查詢Hudi

登入DataLake叢集查詢Hudi資料。登入叢集詳情請參見登入叢集

  • Spark查詢

    Spark查詢詳情,請參見Hudi與Spark SQL整合

    1. 執行以下命令,啟動spark-sql。
      spark-sql \
      --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
      --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
      如果您叢集的Spark是Spark 3,且Hudi為0.11及以上版本,則需額外添加以下配置。
      --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
    2. 執行以下命令,驗證表資訊。
      • 查詢hudi_tbl1
        select * from test.hudi_tbl1;
      • 查詢hudi_tbl2
        select * from test.hudi_tbl2;
  • Hudi查詢
    1. 執行以下命令,啟動Hive CLI。
      hive
    2. 執行以下命令,驗證表資訊。
      • 查詢hudi_tbl1
        select * from test.hudi_tbl1;
      • 查詢hudi_tbl2
        select * from test.hudi_tbl2;