EMR-3.38.3及後續版本的DataFlow叢集,可以通過資料湖中繼資料DLF(Data Lake Formation)作為中繼資料讀取DataLake叢集或自訂叢集中的資料。本文為您介紹Dataflow叢集如何串連DLF,並讀取Hudi全量資料。
前提條件
使用限制
DataFlow叢集和DataLake叢集需要在同一VPC下。
操作流程
步驟一:環境準備
拷貝DataLake叢集中${HIVE_CONF_DIR}
下的hive-site.xml到DataFlow叢集。
例如,${HIVE_CONF_DIR}
為/etc/taihao-apps/hive-conf/。
mkdir /etc/taihao-apps/hive-conf
scp root@<master-1-1節點內網的IP地址>:/etc/taihao-apps/hive-conf/hive-site.xml /etc/taihao-apps/hive-conf/
步驟二:啟動Flink SQL
重要
- 務必將DLF的依賴包放置在Hive依賴包的前面,其中DLF依賴包中嵌入了Hudi的依賴。
- 無需關注Datalake叢集中的Hive版本,Hive依賴均使用2.3.6版本的。
執行以下命令,啟動Flink SQL。
sql-client.sh \
-j /opt/apps/FLINK/flink-current/opt/catalogs/dlf/ververica-connector-dlf-1.13-vvr-4.0.15-SNAPSHOT-jar-with-dependencies.jar \
-j /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/ververica-connector-hive-2.3.6-1.13-vvr-4.0.15-SNAPSHOT-jar-with-dependencies.jar
測試時可設定以下配置。
set sql-client.verbose=true;
set sql-client.execution.result-mode=tableau;
set execution.checkpointing.interval=1000;
步驟三:建立並驗證Catalog
進入Flink SQL後,分別建立DLF Catalog和Hive Catalog用於讀取Hudi表和Hive表。
- 執行以下命令,建立Catalog。
- 建立DLF Catalog
CREATE CATALOG dlf_catalog WITH ( 'type' = 'dlf', 'access.key.id' = '<yourAccessKeyId>', --您阿里雲帳號的AccessKey ID。 'access.key.secret' = '<yourAccessKeyId>', --您阿里雲帳號的AccessKey Secret。 'warehouse' = 'oss://oss-bucket/warehouse/test.db', 'oss.endpoint' = '<oss.endpoint>', --從${HADOOP_CONF_DIR}/core-site.xml中擷取。 'dlf.endpoint' = '<dlf.endpoint>', --從${HIVE_CONF_DIR}/hive-site.xml中擷取。 'dlf.region-id' = '<dlf.region-id>' --從${HIVE_CONF_DIR}/hive-site.xml中擷取。 );
- 建立Hive Catalog重要 無需關注Datalake叢集中的Hive版本,
hive-version
均使用2.3.6。CREATE CATALOG hive_catalog WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-version' = '2.3.6', 'hive-conf-dir' = '/etc/taihao-apps/hive-conf/', 'hadoop-conf-dir' = '/etc/taihao-apps/hadoop-conf/' );
Catalog建立成功後,均會返回以下資訊。[INFO] Execute statement succeed.
- 建立DLF Catalog
- 執行以下命令,驗證Catalog。
- 驗證DLF Catalog
select * from dlf_catalog.test.hudi_table;
- 驗證Hive Catalog
select * from hive_catalog.test.hive_table;
- 驗證DLF Catalog
步驟四:Flink SQL寫入Hudi
- 情境一:資料入湖使用Datagen Connector隨機產生上遊Source資料,入湖Hudi表。
-- 構建上遊Source資料 CREATE TABLE datagen_source ( uuid int, age int, ts bigint, ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10' ); -- 建立Hudi表 CREATE TABLE dlf_catalog.test.hudi_tbl1( id int not null, age int, ts bigint ) with( 'connector'='hudi', 'path' = 'oss://oss-bucket/warehouse/test.db/hudi_tbl1', 'table.type'='COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field'='id', 'hive_sync.enable'='true', 'hive_sync.table'='hudi_tbl1', -- required, Hive建立的表名。 'hive_sync.db'='test', -- required, Hive建立的資料庫名。 'hive_sync.mode' = 'hms' -- required, 將hive sync mode設定為hms, 預設jdbc。 ); --入湖 insert into dlf_catalog.test.hudi_tbl1 select uuid as id, age, ts from default_catalog.default_database.datagen_source; -- 查詢驗證 select * from dlf_catalog.test.hudi_tbl1;
- 情境二:維度打寬入湖
使用ODS層的Hudi資料和Hive的維度資料表關聯打寬填充維度欄位,最後寫入新的Hudi表。
例如,已有表dlf_catalog.test.hive_dim_tbl,表結構如下。id int name string
詳細樣本如下。-- 建立目標Hudi表 CREATE TABLE dlf_catalog.test.hudi_tbl2( id int not null, name string, age int, ts bigint ) with( 'connector'='hudi', 'path' = 'oss://oss-bucket/warehouse/test.db/hudi_tbl2', 'table.type'='COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field'='id', 'hive_sync.enable'='true', 'hive_sync.table'='hudi_tbl2', -- required, Hive建立的表名。 'hive_sync.db'='test', -- required, Hive建立的資料庫名。 'hive_sync.mode' = 'hms' -- required, 將hive sync mode設定為hms, 預設jdbc。 ); -- 關聯Hive維度資料表後入湖 insert into dlf_catalog.test.hudi_tbl2 select s.id, name, age, ts from dlf_catalog.test.hudi_tbl1 s join hive_catalog.test.hive_dim_tbl t on s.id = t.id;
步驟五:DataLake叢集查詢Hudi
登入DataLake叢集查詢Hudi資料。登入叢集詳情請參見登入叢集。
- Spark查詢
Spark查詢詳情,請參見Hudi與Spark SQL整合。
- 執行以下命令,啟動spark-sql。
spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
如果您叢集的Spark是Spark 3,且Hudi為0.11及以上版本,則需額外添加以下配置。--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
- 執行以下命令,驗證表資訊。
- 查詢hudi_tbl1
select * from test.hudi_tbl1;
- 查詢hudi_tbl2
select * from test.hudi_tbl2;
- 查詢hudi_tbl1
- 執行以下命令,啟動spark-sql。
- Hudi查詢
- 執行以下命令,啟動Hive CLI。
hive
- 執行以下命令,驗證表資訊。
- 查詢hudi_tbl1
select * from test.hudi_tbl1;
- 查詢hudi_tbl2
select * from test.hudi_tbl2;
- 查詢hudi_tbl1
- 執行以下命令,啟動Hive CLI。