本文为您介绍如何使用Hudi连接器。
背景信息
Apache Hudi是一种开源的数据湖表格式框架。Hudi基于对象存储或者HDFS组织文件布局,保证ACID,支持行级别的高效更新和删除,从而降低数据ETL开发门槛。同时该框架还支持自动管理及合并小文件,保持指定的文件大小,从而在处理数据插入和更新时,不会创建过多的小文件,引发查询端性能降低,避免手动监控和合并小文件的运维负担。
类别 | 详情 |
支持类型 | 源表和结果表 |
运行模式 | 流模式和批模式 |
数据格式 | 暂不支持 |
特有监控指标 |
说明 指标含义详情,请参见监控指标说明。 |
API种类 | Datastream和SQL |
是否支持更新或删除结果表数据 | 是 |
特色功能
类别 | 详情 |
Hudi的核心特性 |
|
Hudi的典型场景 |
|
全托管Hudi优势 | 相比开源社区Hudi,全托管Flink平台集成Hudi具有的功能优势详情如下所示:
|
使用限制
仅Flink计算引擎vvr-4.0.11-flink-1.13及以上版本支持Hudi Connector。
文件系统仅支持HDFS或阿里云OSS和OSS-HDFS服务。
不支持以Session模式提交作业。
不支持修改字段,如需修改,请在DLF控制台通过Spark SQL语句进行操作。
语法结构
CREATE TEMPORARY TABLE hudi_tbl (
uuid BIGINT,
data STRING,
ts TIMESTAMP(3),
PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'oss://<yourOSSBucket>/<自定义存储位置>',
...
);
WITH参数
基础参数
通用
参数
说明
数据类型
是否必填
默认值
备注
connector
表类型。
String
是
无
固定值为hudi。
path
表存储路径。
String
是
无
支持阿里云OSS、HDFS和OSS-HDFS和三种路径。
OSS:路径格式为
oss://<bucket>/<user-defined-dir>
。HDFS:路径格式为
hdfs://<user-defined-dir>
。OSS-HDFS:路径格式为
oss://<bucket>.<oss-hdfs-endpoint>/<user-defined-dir>
。说明仅Flink计算引擎VVR 8.0.3及以上版本支持该参数配置为OSS-HDFS路径。
其中:
bucket:表示您创建的OSS Bucket名称。
user-defined-dir:表示数据存放路径。
oss-hdfs-endpoint:表示OSS-HDFS服务Endpoint。
您可以在OSS实例概览页面的访问端口中查看HDFS的Endpoint信息。
hoodie.datasource.write.recordkey.field
主键字段。
String
否
uuid
支持通过PRIMARY KEY语法设置主键字段。
支持使用英文逗号(,)分隔多个字段。
precombine.field
版本字段。
String
否
ts
基于此字段的大小来判断消息是否进行更新。
如果您没有设置该参数,则系统默认会按照消息在引擎内部处理的先后顺序进行更新。
oss.endpoint
阿里云对象存储服务OSS或者OSS-HDFS的Endpoint。
String
否
无
如果使用OSS或者OSS-HDFS作为存储,则必需填写。
使用OSS时,参数取值详情请参见访问域名和数据中心。
使用OSS-HDFS时,您可以在OSS实例概览页面的访问端口中查看HDFS服务的Endpoint信息。
accessKeyId
阿里云账号的AccessKey ID。
String
否
无
如果使用OSS或者OSS-HDFS作为存储,则必需填写。
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey ID取值,详情请参见变量和密钥管理。
accessKeySecret
阿里云账号的AccessKey Secret。
String
否
无
如果使用OSS或者OSS-HDFS作为存储,则必需填写。
详情请参见如何查看AccessKey ID和AccessKey Secret信息?
重要为了避免您的AK信息泄露,建议您通过密钥管理的方式填写AccessKey Secret取值,详情请参见变量和密钥管理。
源表独有
参数
说明
数据类型
是否必填
默认值
备注
read.streaming.enabled
是否开启流读。
boolean
否
false
参数取值如下:
true:开启流读。
false:不开启流读。
read.start-commit
读取起始位点。
string
否
不填
参数取值如下:
yyyyMMddHHmmss:从指定时间点开始消费。
earliest:从最早位点开始消费。
不填:从最新时间开始消费。
结果表独有
参数
说明
数据类型
是否必填
默认值
备注
write.operation
写入操作模式。
String
否
UPSERT
参数取值如下:
insert模式:数据追加写。
upsert模式:数据更新。
bulk_insert模式:数据批量追加写。
hive_sync.enable
是否开启同步元数据到Hive功能。
boolean
否
false
参数取值如下:
true:开启同步元数据到Hive功能。
false:关闭同步元数据到Hive功能。
hive_sync.mode
Hive数据同步模式。
String
否
hms
参数取值如下:
hms:元数据同步到Hive Metastore或者DLF时,需要设置为hms。
jdbc:元数据同步到jdbc时,需要设置为jdbc。
hive_sync.db
同步到Hive的数据库名称。
String
否
default
无。
hive_sync.table
同步到Hive的表名称。
String
否
当前table名
hudi同步到Hive的表名不能使用中划线( -)。
dlf.catalog.region
DLF服务的地域名。
String
否
无
详情请参见已开通的地域和访问域名。
说明仅当hive_sync.mode设置为hms时,dlf.catalog.region参数设置才生效。
请和dlf.catalog.endpoint选择的地域保持一致。
dlf.catalog.endpoint
DLF服务的Endpoint。
String
否
无
详情请参见已开通的地域和访问域名。
说明仅当hive_sync.mode设置为hms时,dlf.catalog.endpoint参数设置才生效。
推荐您为dlf.catalog.endpoint参数配置DLF的VPC Endpoint。例如,如果您选择的地域为cn-hangzhou地域,则dlf.catalog.endpoint参数需要配置为dlf-vpc.cn-hangzhou.aliyuncs.com
如果您需要跨VPC访问DLF,则请参见如何访问跨VPC的其他服务?
高阶参数
Hudi支持丰富的写入和读取场景,不同场景的参数如下表所示。
并发参数
名称 | 说明 | 默认值 | 备注 |
write.tasks | writer的并发,每个writer顺序写1~N个buckets。 | 4 | 增加写任务的并发对小文件个数没影响。 |
write.bucket_assign.tasks | bucket assigner的并发。 | Flink并发度 | 增加写任务的并发同时增加了写任务的bucket数,也就是增加了小文件(小bucket)数。 |
write.index_bootstrap.tasks | Index bootstrap算子的并发。 | Flink并发度 |
|
read.tasks | 流和批读算子的并发。 | 4 | 无。 |
compaction.tasks | online compaction算子的并发。 | 4 | online compaction比较耗费资源,建议走offline compaction。 |
在线压缩参数
名称 | 说明 | 默认值 | 备注 |
compaction.schedule.enabled | 是否阶段性生成压缩plan。 | true | 参数取值如下:
说明 建议阶段性生成压缩plan,即使compaction.async.enabled关闭的情况下。 |
compaction.async.enabled | 是否开启异步压缩。 | true | 参数取值如下:
说明 通过关闭compaction.async.enabled参数可关闭在线压缩执行,但是调度compaction.schedule.enabled仍然建议开启,之后可通过离线异步压缩,执行阶段性生成的压缩plan。 |
compaction.tasks | 压缩任务的并发数。 | 4 | 无。 |
compaction.trigger.strategy | 压缩策略。 | num_commits | 支持以下压缩策略:
|
compaction.delta_commits | 经过多少个commit触发压缩。 | 5 | 无。 |
compaction.delta_seconds | 经过多少秒后触发压缩。 | 3600 | 单位为秒。 |
compaction.max_memory | 用于压缩去重的hashmap的可用内存大小。 | 100 MB | 资源够用时,建议调整到1 GB。 |
compaction.target_io | 每个压缩plan的IO上限。 | 500 GB | 无。 |
文件大小
文件参数控制了文件的大小,目前支持的参数详情如下表所示。
名称 | 说明 | 默认值 | 备注 |
hoodie.parquet.max.file.size | 最大可写入的parquet文件大小。 超过可写入的parquet文件大小时,将写入到新的文件组。 | 120 * 1024 * 1024 byte (120 MB) | 单位是byte。 |
hoodie.parquet.small.file.limit | 小文件的大小阈值,小于该参数的文件被认为是小文件。 | 104857600 byte(100 MB) |
|
hoodie.copyonwrite.record.size.estimate | 预估的record大小。 | 1024 byte(1 KB) |
|
Hadoop参数
名称 | 说明 | 默认值 | 备注 |
hadoop.${you option key} | 通过hadoop.前缀指定hadoop配置项。 | 无 | 支持同时指定多个hadoop配置项。 说明 从Hudi 0.12.0开始支持,针对跨集群提交执行的需求,可以通过DDL指定per-job级别的hadoop配置。 |
数据写入
Hudi支持丰富的写入方式,包括离线批量写入、流式写入等场景。支持丰富的数据类型,包括changelog以及log数据。同时支持不同的索引方案。
离线批量写入
针对存量数据导入Hudi的需求,如果存量数据来源于其他数据源,可以使用批量导入功能,快速将存量数据导成Hoodie表格式。
名称
说明
默认值
备注
write.operation
写操作类型。
upsert
参数取值如下:
upsert:插入更新
insert:插入
bulk_insert:批量写入
说明bulk_insert导入省去了avro的序列化以及数据的merge过程,没有去重操作,数据的唯一性需要自己来保证。
bulk_insert需要在Batch Execution Mode下执行,Batch模式默认会按照分区名称排序输入消息再写入Hoodie,避免file handle频繁切换导致性能下降。
write.tasks
bulk_insert写任务的并发。
Flink的并发度
bulk_insert写任务的并发通过参数write.tasks指定,并发的数量会影响到小文件的数量。
理论上,bulk_insert写任务的并发数就是划分的bucket数,当每个bucket在写到文件大小上限(parquet 120 MB)时,会滚动到新句柄,所以最终的写文件数量大于等于bulk_insert写任务的并发。
write.bulk_insert.shuffle_input
是否将数据按照partition字段shuffle再通过write task写入。
true
从Hudi 0.11.0版本开始,开启该参数将减少小文件的数量,但是可能有数据倾斜风险。
write.bulk_insert.sort_input
是否将数据先按照partition字段排序再写入。
true
从Hudi 0.11.0版本开始支持,当一个write task写多个partition,开启可以减少小文件数量。
write.sort.memory
sort算子的可用managed memory。
128
单位是MB。
Changelog模式
该模式只有MOR表支持,在该模式下Hoodie会保留消息的所有变更(I/-U/U/D),之后再配合Flink引擎的有状态计算实现全链路近实时数仓生产增量计算。Hoodie的MOR表通过行存原生支持保留消息的所有变更(format层面的集成),通过Flink全托管流读单个MOR表可以消费到所有的变更记录。
说明非changelog模式,流读单次的batch数据集会merge中间变更;批读(快照读)会合并所有的中间结果,不管中间状态是否已被写入,都将被忽略。
名称
说明
默认值
备注
changelog.enabled
是否消费所有变更。
false
参数取值如下:
true:支持消费所有变更。
false:不消费所有变更,即UPSERT语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被merge掉。
说明开启changelog.enabled参数后,异步的压缩任务仍然会将中间变更合并成1条数据,所以如果流读消费不够及时,被压缩后只能读到最后一条记录。但是,可以通过调整压缩的频率,预留一定的时间buffer给 reader,比如调整compaction.delta_commits:5和compaction.delta_seconds: 3600压缩参数。
Append模式(从Hudi 0.10.0版本开始支持)
在该模式下:
MOR表会应用小文件策略:会追加写avro log文件。
COW表没有小文件策略:每次写入COW表直接写新的parquet文件。
Clustering策略
Hudi支持丰富的Clustering策略,从而优化INSERT模式下的小文件问题。
Inline Clustering(只有Copy On Write表支持该模式)
名称
说明
默认值
备注
write.insert.cluster
是否在写入时合并小文件。
false
参数取值如下:
true:在写入时,合并小文件。
false:在写入时,不合并小文件。
说明COW表默认insert写不合并小文件,开启该参数后,每次写入会优先合并之前的小文件,但不会去重,吞吐会受影响。
Async Clustering(从Huid 0.12.0版本开始支持)
名称
说明
默认值
备注
clustering.schedule.enabled
是否在写入时定时调度Clustering plan。
false
开启后周期性调度clustering plan。
clustering.delta_commits
经过多少个commits生成Clustering plan。
4
clustering.schedule.enabled为true时,生效。
clustering.async.enabled
是否异步执行Clustering plan。
false
开启后周期性异步执行,合并小文件。
clustering.tasks
Clustering task执行并发。
4
无。
clustering.plan.strategy.target.file.max.bytes
Clustering单文件目标大小。
1024 * 1024 * 1024
单位是byte。
clustering.plan.strategy.small.file.limit
Clustering小文件阈值。
600
小于该大小的文件才会参与clustering。
clustering.plan.strategy.sort.columns
Clustering排序字段。
无
支持指定特殊的排序字段。
Clustering Plan Strategy
名称
说明
默认值
备注
clustering.plan.partition.filter.mode
Clustering分区过滤模式。
NONE
支持的模式如下:
NONE:不过滤分区,所有分区都用于聚合,即不做限制。
RECENT_DAYS:数据按分区时,合并最近N天的数据。
SELECTED_PARTITIONS:指定固定的分区。
clustering.plan.strategy.daybased.lookback.partitions
采用RECENT_DAYS模式下的目标分区天数。
2
仅当clustering.plan.partition.filter.mode取值为RECENT_DAYS时生效。
clustering.plan.strategy.cluster.begin.partition
指定开始分区,用于过滤分区。
无
仅当clustering.plan.partition.filter.mode取值为SELECTED_PARTITIONS时有效。
clustering.plan.strategy.cluster.end.partition
指定结束分区,用于过滤分区。
无
仅当clustering.plan.partition.filter.mode取值为SELECTED_PARTITIONS时有效。
clustering.plan.strategy.partition.regex.pattern
通过正则表达式指定目标分区。
无
无。
clustering.plan.strategy.partition.selected
指定目标partitions。
无
支持通过英文逗号(,)分割多个partition。
Bucket索引
说明从Hudi 0.11.0版本开始支持以下表格中的参数。
名称
说明
默认值
备注
index.type
索引类型。
FLINK_STATE
参数取值如下:
FLINK_STATE:使用flink state索引。
BUCKET:使用bucket索引。
当数据量比较大时(表的数据条目超过5 亿),flink state的存储开销可能成为瓶颈。bucket索引通过固定的hash策略,将相同key的数据分配到同一个fileGroup中,可以避免索引的存储和查询开销。bucket index和flink state索引对比有以下区别:
bucket index没有flink state的存储计算开销,性能较好。
bucket index无法扩buckets,state index则可以依据文件的大小动态增加文件个数。
bucket index不支持跨partition的变更(如果输入是cdc流则没有这个限制),state index没有限制。
hoodie.bucket.index.hash.field
bucket索引hash key字段。
主键
可以设置成主键的子集。
hoodie.bucket.index.num.buckets
bucket索引的bucket个数。
4
默认每个partition的bucket数,当前设置后则不可再变更。
数据读取
Hudi支持丰富的读取方案,包括批读、流读、增量拉取,同时支持消费、传播changelog,实现端到端增量ETL。
流读
当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过read.streaming.enabled参数开启流读模式,通过read.start-commit参数指定起始消费位置,支持指定earliest从最早消费。
名称
说明
默认值
备注
read.streaming.enabled
是否开启流读模式。
false
参数取值如下:
true:开启流读模式。
false:关闭流读模式。
read.start-commit
流读起始位点
不填
参数取值如下:
yyyyMMddHHmmss:从指定时间点开始消费。
earliest:从最早位点开始消费。
不填:从最新时间开始消费。
clean.retain_commits
cleaner最多保留的历史commits数。
30
大于此数量的历史commits会被清理掉,changelog模式下,该参数可以控制changelog的保留时间,例如checkpoint周期为5分钟一次,默认最少保留150分钟的时间。
重要仅从0.10.0开始支持流读changelog。开启changelog模式后,hudi会保留一段时间的changelog供下游consumer消费。
changelog有可能会被compaction合并掉,中间记录会消除,可能会影响计算结果。
增量读取(从Hudi 0.10.0版本开始支持)
支持通过Flink全托管DataStream方式增量消费、Batch增量消费和TimeTravel(Batch消费某个时间点的数据)。
名称
说明
默认值
备注
read.start-commit
指定起始消费位点。
从最新位置commit
请按yyyyMMddHHmmss格式指定流读的起始位点。
区间为闭区间,即包含起始和结束。
read.end-commit
指定结束消费位点。
从最新位置commit
代码示例
源表
CREATE TEMPORARY TABLE blackhole (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'blackhole'
);
CREATE TEMPORARY TABLE hudi_tbl (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'oss.endpoint' = '<yourOSSEndpoint>',
'accessKeyId' = '${secret_values.ak_id}',
'accessKeySecret' = '${secret_values.ak_secret}',
'path' = 'oss://<yourOSSBucket>/<自定义存储位置>',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true'
);
-- 从最新的commit流读写入blackhole。
INSERT INTO blackhole SELECT * from hudi_tbl;
结果表
CREATE TEMPORARY TABLE datagen(
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'datagen' ,
'rows-per-second'='100'
);
CREATE TEMPORARY TABLE hudi_tbl (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'oss.endpoint' = '<yourOSSEndpoint>',
'accessKeyId' = '${secret_values.ak_id}',
'accessKeySecret' = '${secret_values.ak_secret}',
'path' = 'oss://<yourOSSBucket>/<自定义存储位置>',
'table.type' = 'MERGE_ON_READ'
);
INSERT INTO hudi_tbl SELECT * from datagen;
Datastream API
通过DataStream的方式读写数据时,则需要使用对应的DataStream连接器连接Flink全托管,DataStream连接器设置方法请参见DataStream连接器设置方法。
maven pom
根据使用的VVR版本,指定Flink和Hudi版本。
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.15.4</flink.version> <hudi.version>0.13.1</hudi.version> </properties> <dependencies> <!-- flink --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- hudi --> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-flink1.15-bundle</artifactId> <version>${hudi.version}</version> <scope>provided</scope> </dependency> <!-- oss --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.3.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aliyun</artifactId> <version>3.3.2</version> <scope>provided</scope> </dependency> <!-- dlf --> <dependency> <groupId>com.aliyun.datalake</groupId> <artifactId>metastore-client-hive2</artifactId> <version>0.2.14</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.5.1</version> <scope>provided</scope> </dependency> </dependencies>
重要DLF使用的部分依赖与社区版本存在冲突,例如
hive-common
、hive-exec
。如果您有本地测试DLF的需求,可以下载hive-common和hive-execJAR包,然后在IDEA手动导入。写入到Hudi
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.util.HoodiePipeline; import java.util.HashMap; import java.util.Map; public class FlinkHudiQuickStart { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String dbName = "test_db"; String tableName = "test_tbl"; String basePath = "oss://xxx"; Map<String, String> options = new HashMap<>(); // hudi conf options.put(FlinkOptions.PATH.key(), basePath); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts"); options.put(FlinkOptions.DATABASE_NAME.key(), dbName); options.put(FlinkOptions.TABLE_NAME.key(), tableName); // oss conf options.put("hadoop.fs.oss.accessKeyId", "xxx"); options.put("hadoop.fs.oss.accessKeySecret", "xxx"); // 本地调试使用公网网端,例如oss-cn-hangzhou.aliyuncs.com;提交集群使用内网网端,例如oss-cn-hangzhou-internal.aliyuncs.com options.put("hadoop.fs.oss.endpoint", "xxx"); options.put("hadoop.fs.AbstractFileSystem.oss.impl", "org.apache.hadoop.fs.aliyun.oss.OSS"); options.put("hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"); // dlf conf options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true"); // 可选择是否同步DLF options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms"); options.put(FlinkOptions.HIVE_SYNC_DB.key(), dbName); options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), tableName); options.put("hadoop.dlf.catalog.id", "xxx"); options.put("hadoop.dlf.catalog.accessKeyId", "xxx"); options.put("hadoop.dlf.catalog.accessKeySecret", "xxx"); options.put("hadoop.dlf.catalog.region", "xxx"); // 本地调试使用公网网端,例如dlf.cn-hangzhou.aliyuncs.com,提交集群使用内网网端,例如dlf-vpc.cn-hangzhou.aliyuncs.com options.put("hadoop.dlf.catalog.endpoint", "xxx"); options.put("hadoop.hive.imetastoreclient.factory.class", "com.aliyun.datalake.metastore.hive2.DlfMetaStoreClientFactory"); DataStream<RowData> dataStream = env.fromElements( GenericRowData.of(StringData.fromString("id1"), StringData.fromString("name1"), 22, StringData.fromString("1001"), StringData.fromString("p1")), GenericRowData.of(StringData.fromString("id2"), StringData.fromString("name2"), 32, StringData.fromString("1002"), StringData.fromString("p2")) ); HoodiePipeline.Builder builder = HoodiePipeline.builder(tableName) .column("uuid string") .column("name string") .column("age int") .column("ts string") .column("`partition` string") .pk("uuid") .partition("partition") .options(options); builder.sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded env.execute("Flink_Hudi_Quick_Start"); } }