Apache Hudi(简称Hudi)是基于OSS对象存储的一种表格式,支持UPDATE、DELETE和INSERT操作。云原生数据仓库 AnalyticDB MySQL 版和Hudi做了深度整合,您可以通过Spark SQL读写Hudi外表。本文主要介绍如何通过Spark SQL读写Hudi外表。
前提条件
集群的产品系列为湖仓版。
说明湖仓版集群存储预留资源需大于0 ACU。
已在湖仓版集群中创建Job型资源组。具体操作,请参见新建资源组。
已创建湖仓版集群的数据库账号。
步骤一:进入数据开发
登录云原生数据仓库AnalyticDB MySQL控制台,在左上角选择集群所在地域。在左侧导航栏,单击集群列表,在湖仓版页签下,单击目标集群ID。
在左侧导航栏,单击
。在SQLConsole窗口,选择Spark引擎和Job型资源组。
步骤二:创建外库与Hudi外表
您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式。
执行以下语句,创建数据库。如果已有数据库,可跳过本步骤。
CREATE DATABASE adb_external_db_hudi location 'oss://<bucket_name>/test/'; /*用于在该路径中创建表,请替换为自己的OSS路径。*/
执行以下语句,创建Hudi外表。
CREATE TABLE adb_external_db_hudi.test_hudi_tbl ( `id` int, `name` string, `age` int ) using hudi tblproperties (primaryKey = 'id', preCombineField = 'age') partitioned by (age) location 'oss://<bucket_name>/test/table/'; /*写入的数据会存储在该路径中,请替换为自己的OSS路径。*/
重要OSS路径中的Bucket需与创建数据库所选的Bucket相同。
创建外表时选择的OSS路径需比创建数据库时选择的OSS路径至少多一层目录,且外表的路径需在数据库路径下。
建表时必须定义primaryKey主键。preCombineField预聚合字段为可选,如未定义preCombineField,则在UPDATE场景会报错。
步骤三:写入Hudi外表数据
您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式。
INSERT
执行以下语句,写入数据。您可以选择以下任意一种方式向Hudi外表中写入数据。
方式一:INSERT INTO写入
INSERT INTO adb_external_db_hudi.test_hudi_tbl values(1, 'lisa', 10),(2, 'jams', 10);
方式二:INSERT OVERWRITE全表写入
INSERT OVERWRITE adb_external_db_hudi.test_hudi_tbl values (1, 'lisa', 10), (2, 'jams', 20);
方式三:INSERT OVERWRITE静态分区写入
INSERT OVERWRITE adb_external_db_hudi.test_hudi_tbl partition(age=10) values(1, 'anna');
方式四:INSERT OVERWRITE动态分区写入
INSERT OVERWRITE adb_external_db_hudi.test_hudi_tbl partition (age) values (1, 'bom', 10);
UPDATE
执行以下语句更新数据,本文以将id=2的name列更新为box为例。
UPDATE adb_external_db_hudi.test_hudi_tbl SET name = 'box' where id = 2;
DELETE
执行以下语句删除数据,本文以删除id列为1的数据为例。
DELETE FROM adb_external_db_hudi.test_hudi_tbl where id = 1;
并发控制
Hudi外表基于LockProvider的并发控制机制,避免执行DML操作时出现并发冲突。多个任务可以同时写入不同的数据范围,为了避免写入冲突,需要确保没有重叠的数据范围写入,从而保证了数据的正确性和一致性。您需要配置以下参数来启用并发控制。详细Hudi的并发控制机制请参见Apache Hudi。
若使用开源Hudi JAR包,暂不支持MdsBasedLockProvider实现并发控制。
set hoodie.cleaner.policy.failed.writes=LAZY;
set hoodie.write.concurrency.mode=OPTIMISTIC_CONCURRENCY_CONTROL;
set hoodie.write.lock.provider=org.apache.hudi.sync.adb.MdsBasedLockProvider;
参数说明:
参数 | 参数值 | 是否必填 | 描述 | |
hoodie.cleaner.policy.failed.writes | LAZY | 是 | 指定写入失败时的脏数据清理策略。 取值为LAZY,表示写入提交前不清理未完成的提交,失败的提交待心跳过期后由Clean操作统一清理,适用多个并发写入的场景。 | |
hoodie.write.concurrency.mode | OPTIMISTIC_CONCURRENCY_CONTROL | 是 | 写入操作的并发模式。取值为 OPTIMISTIC_CONCURRENCY_CONTROL,表示一个Hudi外表如果同时有多个写入任务,每个写入任务提交前都会检查是否存在提交冲突,冲突情况下本次写入失败。 | |
hoodie.write.lock.provider | org.apache.hudi.sync.adb.MdsBasedLockProvider | 是 | 锁提供程序类名,用户可以提供自己的LockProvider实现,提供的类必须是 |
步骤四:查询数据
您可以选择批处理或交互式执行任意一种方式执行以下SQL语句。详情请参见Spark SQL执行方式。
执行Spark SQL语句,只返回执行成功或者失败,不返回数据。您可以在Spark Jar开发页面应用列表页签中的日志查看表数据。详情请参见查看Spark应用信息。
执行以下语句,查询Hudi外表数据。
SELECT * FROM adb_external_db_hudi.test_hudi_tbl;