本文为您介绍Hologres基于Delta Lake实现湖仓一体的背景、架构、环境准备及使用说明等信息。
背景信息
Delta Lake是DataBricks公司推出的一种数据湖方案。Delta Lake以数据为中心,围绕数据流走向(数据从流入数据湖、数据组织管理和数据查询到流出数据湖)推出了一系列功能特性,协助您搭配第三方上下游工具,搭建快捷、易用和安全的数据湖。详情请参见Delta Lake概述。
EMR是阿里云提供的云原生开源大数据平台,向您提供简单易集成的Hadoop、Hive、Spark、Flink等开源大数据计算和存储引擎,便于您使用Hadoop和Spark生态系统中的其他周边系统分析和处理数据。详情请参见什么是E-MapReduce。
DLF是一款全托管的帮助您构建云上数据湖及Lakehouse的服务,为您提供了统一的元数据管理、统一的权限与安全管理、便捷的数据入湖能力以及一键式数据探索能力。详情请参见DLF产品简介。
Hologres作为一站式实时数仓,与DLF、EMR无缝集成,打破数据湖与数据仓库割裂的体系,构建完整的湖仓一体解决方案,将数据湖的灵活性、生态丰富性与实时数仓的高性能在线复杂分析、企业级能力相结合,为您提供一站式实时湖仓解决方案。详情请参见OSS数据湖加速。
整体架构
本解决方案通过EMR Spark来进行数据加工与处理,元数据存储在DLF中,数据存储在OSS上,Hologres可以利用DLF对OSS元数据的管理能力,对OSS多种格式的湖数据(Hudi、Delta、CSV、Parquet、ORC、SequenceFile)进行加速查询和湖仓融合分析,将数据提供给BI报表、可视化大屏和上层应用进一步消费使用。
环境准备
数据源准备
此步骤主要针对初次使用EMR或者OSS服务的用户。如您在实际业务中,已经有大量业务数据通过EMR服务写入OSS Bucket,可直接使用DLF元数据抽取功能自动生成元数据信息,供Hologres来查询访问。元数据抽取方式请参见元数据抽取。
开通EMR数据湖集群,选择需要的服务和存储格式,选择DLF来管理元数据。本文以Spark+Hive+Delta为例,开通方式请参见E-MapReduce快速入门。
开通OSS服务,创建存储空间用于存储数据,详情请参见开通OSS服务。
使用EMR Spark构建数据。
登录EMR集群,可选择SSH方式登录集群主节点或者免密登录集群Core节点,详情请参见登录集群。
构建TPC-H 100GB测试数据,命令如下。
说明本文的TPC-H的实现基于TPC-H的基准测试,并不能与已发布的TPC-H基准测试结果相比较,本文中的测试并不符合TPC-H基准测试的所有要求。
# 执行yum update更新所有库 yum update # 安装 git 和 gcc yum install git yum install gcc #下载TPC-H数据生成代码 git clone https://github.com/gregrahn/tpch-kit.git #进入数据生成工具代码目录 cd tpch-kit/dbgen # 编译数据生成工具代码 make # 运行如下代码生成数据 ./dbgen -vf -s 100
进入Hive交互界面,创建数据库和表,导入上述生成的数据。
# 进入hive交互界面 hive # 创建数据库 CREATE DATABASE IF NOT EXISTS testdb_textfile location 'oss://oss-bucket-dlftest/testdb_textfile'; # 切换至刚创建的数据库 USE testdb_textfile; # 创建表 CREATE TABLE IF NOT EXISTS nation_textfile ( n_nationkey integer , n_name char(25) , n_regionkey integer , n_comment varchar(152) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'; CREATE TABLE IF NOT EXISTS region_textfile ( r_regionkey integer , r_name char(25) , r_comment varchar(152) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'; CREATE TABLE IF NOT EXISTS part_textfile ( p_partkey integer , p_name varchar(55) , p_mfgr char(25) , p_brand char(10) , p_type varchar(25) , p_size integer , p_container char(10) , p_retailprice decimal(15,2) , p_comment varchar(23) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'; CREATE TABLE IF NOT EXISTS supplier_textfile ( s_suppkey integer , s_name char(25) , s_address varchar(40) , s_nationkey integer , s_phone char(15) , s_acctbal decimal(15,2) , s_comment varchar(101) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'; CREATE TABLE IF NOT EXISTS partsupp_textfile ( ps_partkey integer , ps_suppkey integer , ps_availqty integer , ps_supplycost decimal(15,2) , ps_comment varchar(199) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'; CREATE TABLE IF NOT EXISTS customer_textfile ( c_custkey integer , c_name varchar(25) , c_address varchar(40) , c_nationkey integer , c_phone char(15) , c_acctbal decimal(15,2) , c_mktsegment char(10) , c_comment varchar(117) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'; CREATE TABLE IF NOT EXISTS orders_textfile ( o_orderkey integer , o_custkey integer , o_orderstatus char(1) , o_totalprice decimal(15,2) , o_orderdate date , o_orderpriority char(15) , o_clerk char(15) , o_shippriority integer , o_comment varchar(79) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'; CREATE TABLE IF NOT EXISTS lineitem_textfile ( l_orderkey integer , l_partkey integer , l_suppkey integer , l_linenumber integer , l_quantity decimal(15,2) , l_extendedprice decimal(15,2) , l_discount decimal(15,2) , l_tax decimal(15,2) , l_returnflag char(1) , l_linestatus char(1) , l_shipdate date , l_commitdate date , l_receiptdate date , l_shipinstruct char(25) , l_shipmode char(10) , l_comment varchar(44) ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|'; # 导入数据 LOAD DATA LOCAL INPATH '${YOUR_PATH}/nation.tbl*' OVERWRITE INTO TABLE nation_textfile; LOAD DATA LOCAL INPATH '${YOUR_PATH}/region.tbl*' OVERWRITE INTO TABLE region_textfile; LOAD DATA LOCAL INPATH '${YOUR_PATH}/supplier.tbl*' OVERWRITE INTO TABLE supplier_textfile; LOAD DATA LOCAL INPATH '${YOUR_PATH}/customer.tbl*' OVERWRITE INTO TABLE customer_textfile; LOAD DATA LOCAL INPATH '${YOUR_PATH}/part.tbl*' OVERWRITE INTO TABLE part_textfile; LOAD DATA LOCAL INPATH '${YOUR_PATH}/partsupp.tbl*' OVERWRITE INTO TABLE partsupp_textfile; LOAD DATA LOCAL INPATH '${YOUR_PATH}/orders.tbl*' OVERWRITE INTO TABLE orders_textfile; LOAD DATA LOCAL INPATH '${YOUR_PATH}/lineitem.tbl*' OVERWRITE INTO TABLE lineitem_textfile;
输入
spark-sql
命令进入交互界面,创建数据库和delta格式的表。# 进入spark-sql交互界面 spark-sql --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.delta.mergeSchema=true' --conf 'autoMerge.enable=true' --conf 'spark.sql.parquet.writeLegacyFormat=true' # 创建数据库 CREATE DATABASE IF NOT EXISTS test_spark_delta LOCATION 'oss://oss-bucket-dlftest/test_spark_delta'; # 切换至刚创建的数据库并创建表 USE test_spark_delta; CREATE TABLE nation_delta USING delta AS SELECT * FROM ${SOURCE}.nation_textfile; CREATE TABLE region_delta USING delta AS SELECT * FROM ${SOURCE}.region_textfile; CREATE TABLE supplier_delta USING delta AS SELECT * FROM ${SOURCE}.supplier_textfile; CREATE TABLE customer_delta USING delta partitioned BY (c_mktsegment) AS SELECT * FROM ${SOURCE}.customer_textfile; CREATE TABLE part_delta USING delta partitioned BY (p_brand) AS SELECT * FROM ${SOURCE}.part_textfile; CREATE TABLE partsupp_delta USING delta AS SELECT * FROM ${SOURCE}.partsupp_textfile; CREATE TABLE orders_delta USING delta partitioned BY (o_orderdate) AS SELECT * FROM ${SOURCE}.orders_textfile; CREATE TABLE lineitem_delta USING delta partitioned BY (l_shipdate) AS SELECT * FROM ${SOURCE}.lineitem_textfile;
Hologres开启数据加速配置
前往Hologres管理控制台,在实例列表页单击对应实例操作列的数据湖加速即可开启。
使用说明
Hologres的数据湖加速能力,可以满足实际业务中以下两种使用场景,您可以根据业务需要选择合适的场景。
场景一:使用Hologres直接加速查询OSS上的表数据
示例:
-- 创建DLF外部表插件
CREATE EXTENSION IF NOT EXISTS dlf_fdw;
-- 创建外部服务器
CREATE SERVER IF NOT EXISTS dlf_server FOREIGN data wrapper dlf_fdw options
(
dlf_region 'cn-beijing',
dlf_endpoint 'dlf-share.cn-beijing.aliyuncs.com',
oss_endpoint 'oss-cn-beijing-internal.aliyuncs.com'
);
-- 导入外部表定义
IMPORT FOREIGN SCHEMA "test_spark_delta" LIMIT TO
(
customer_delta,
lineitem_delta,
nation_delta,
orders_delta,
part_delta,
partsupp_delta,
region_delta,
supplier_delta
)
FROM SERVER dlf_server INTO oss_ext_tables options (if_table_exist 'update');
-- 查询表数据,以Q22为例
SELECT
cntrycode,
count(*) AS numcust,
sum(c_acctbal) AS totacctbal
FROM
(
SELECT
substring(c_phone FROM 1 FOR 2) AS cntrycode,
c_acctbal
FROM
customer_delta
WHERE
substring(c_phone FROM 1 FOR 2) IN
('24', '32', '17', '18', '12', '14', '22')
AND c_acctbal > (
SELECT
avg(c_acctbal)
FROM
customer_delta
WHERE
c_acctbal > 0.00
AND substring(c_phone FROM 1 FOR 2) IN
('24', '32', '17', '18', '12', '14', '22')
)
AND NOT EXISTS (
SELECT
*
FROM
orders_delta
WHERE
o_custkey = c_custkey
)
) AS custsale
GROUP BY
cntrycode
ORDER BY
cntrycode;
返回结果:
+------------+-------------+---------------+
| cntrycode | numcust | totacctbal |
+------------+-------------+---------------+
| 12 | 90805 | 681136537.68 |
| 14 | 91459 | 685826271.21 |
| 17 | 91313 | 685025263.11 |
| 18 | 91292 | 684588251.63 |
| 22 | 90399 | 677402363.79 |
| 24 | 90635 | 680033065.67 |
| 32 | 90668 | 680459221.16 |
+------------+-------------+---------------+
场景二:导入Hologres标准存储以获取更好的查询性能
Hologres标准存储采用SSD(NVME)硬盘,随机读写性能更好。将OSS外表导入Hologres内部标准存储,可通过创建索引、设置适合的Shard数、选择合适的分布列等手段优化查询性能,以Q2为例,可获得18倍以上的性能提升。详情请参见优化内部表性能化。
Hologres中创建相同结构的内部表并导入数据。
示例如下,更多建表语句请参见Hologres查询体验快速入门。
--创建内部表 BEGIN; CREATE TABLE region ( R_REGIONKEY INT NOT NULL PRIMARY KEY, R_NAME TEXT NOT NULL, R_COMMENT TEXT ); CALL set_table_property('region', 'distribution_key', 'R_REGIONKEY'); CALL set_table_property('region', 'bitmap_columns', 'R_REGIONKEY,R_NAME,R_COMMENT'); CALL set_table_property('region', 'dictionary_encoding_columns', 'R_NAME,R_COMMENT'); CALL set_table_property('region', 'time_to_live_in_seconds', '31536000'); COMMIT; --导入数据 INSERT INTO public.region SELECT * FROM region_delta ;
内表查询结果。
SELECT cntrycode, count(*) AS numcust, sum(c_acctbal) AS totacctbal FROM ( SELECT substring(c_phone FROM 1 FOR 2) AS cntrycode, c_acctbal FROM customer WHERE substring(c_phone FROM 1 FOR 2) IN ('24', '32', '17', '18', '12', '14', '22') AND c_acctbal > ( SELECT avg(c_acctbal) FROM customer WHERE c_acctbal > 0.00 AND substring(c_phone FROM 1 FOR 2) IN ('24', '32', '17', '18', '12', '14', '22') ) AND NOT EXISTS ( SELECT * FROM orders WHERE o_custkey = c_custkey ) ) AS custsale GROUP BY cntrycode ORDER BY cntrycode;
性能对比
以32 Core独享实例为例,可以看到Hologres内部表查询速度比OSS外部表会高出约100倍:
OSS外部表
查询耗时:17.24s。
执行计划:
Hologres内部表
查询耗时:106.67ms。
执行计划: