本文介绍如何创建使用OSS Sink Connector,您可以通过OSS Sink Connector将数据从云消息队列 Kafka 版的数据源Topic导出至对象存储OSS的Object中。
前提条件
详细步骤,请参见创建前提。
注意事项
Connector基于事件被处理的时间做时间分区,非事件的产生时间,如按时间分区,时间边界的数据可能被投递至下一个时间分区目录。
脏数据处理问题:如果在任务的自定义分区或文件内容中配置了JsonPath,但数据未命中JsonPath规则,Connector会将这类数据按攒批策略投递到Bucket下的invalidRuleData/路径。如发现Bucket下有此目录,请检查JsonPath规则的正确性,且避免消费端漏消费数据。
链路可能存在秒级到分钟级别内的延时。
如果自定义分区或文件内容中配置的JsonPath规则需对Kafka Source消息内容做提取,需在Kafka Source侧将内容编解码为Json格式。
Connector实时将上游数据以Append追加方式写入OSS中,因此单个分区路径下,可见的最新文件通常处于写入中的状态,而非目标状态,请谨慎消费。
计费说明
Connector任务运行在阿里云函数计算平台,任务加工传输产生的计算资源将按函数计算单价计费,详情请参见计费概述。
步骤一:创建目标服务资源
在对象存储OSS控制台创建一个存储空间(Bucket)。详细步骤,请参见控制台创建存储空间。
本文以oss-sink-connector-bucket Bucket为例。
步骤二:创建OSS Sink Connector并启动
登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
在左侧导航栏,选择 。
在任务列表页面,单击创建任务。
任务创建
在Source(源)配置向导,选择数据提供方为消息队列 Kafka 版,设置以下参数,然后单击下一步。
参数
说明
示例
地域
选择云消息队列 Kafka 版源实例所在的地域。
华北2(北京)
kafka 实例
选择生产云消息队列 Kafka 版消息的源实例。
alikafka_post-cn-jte3****
Topic
选择生产云消息队列 Kafka 版消息的Topic。
topic
Group ID
数据源所在的云消息队列 Kafka 版实例的Group ID。
快速创建:新建一个Group ID资源,
使用已有:填写一个已经的创建好的GroupID资源。
GID_http_1
消费位点
选择开始消费消息的位点。
最新位点
网络配置
选择路由消息的网络类型。
基础网络
专有网络VPC
选择VPC ID。当网络配置设置为自建公网时需要设置此参数。
vpc-bp17fapfdj0dwzjkd****
交换机
选择vSwitch ID。当网络配置设置为自建公网时需要设置此参数。
vsw-bp1gbjhj53hdjdkg****
安全组
选择安全组。当网络配置设置为自建公网时需要设置此参数。
alikafka_pre-cn-7mz2****
批量推送条数
调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。
100
批量推送间隔(单位:秒)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)配置向导,设置数据模式内容过滤发送的请求。更多信息,请参见事件模式。
在Transform(转换)配置向导,设置数据清洗,实现分割、映射、富化及动态路由等繁杂数据加工能力。更多信息,请参见数据清洗。
在Sink(目标)配置向导,选择服务类型为对象存储 OSS,配置以下参数,单击保存。
参数
说明
示例
OSS Bucket
已创建的对象存储OSS Bucket。
重要保证填写的Bucket已手动创建完成,且在任务运行期间不被删除。
OSS存储类型请选择为标准存储、低频存储,暂不支持投递至归档存储Bucket。
创建OSS Sink Connector任务后,平台将在此OSS Bucket一级目录下生成.tmp/系统文件路径,请勿删除和使用该路径下的OSS Object。
oss-sink-connector-bucket
保存路径
OSS Object规则分Path和Name两部分,例如ObjectKey为
a/b/c/a.txt
时,Path为a/b/c/
,Name为a.txt
,其中自定义分区(即Path)可自定义。Name由Connector内部按固定规则生成:{毫秒级别unix 时间戳}_{8位随机字符串}
,例如:1705576353794_elJmxu3v。如未配置,或配置为 "/",表示无分区,数据将保存在Bucket一级目录下。
支持时间变量参数,{yyyy}、{MM}、{dd}、{HH} ,分别代表年、月、天、小时,大小写敏感。
支持JsonPath规则自定义OSS路径参数,例如: {$.data.topic}、{$.data.partition}。JsonPath变量需满足标准JsonPath表达式,受限于OSS路径规则,通过JsonPath提取值的类型建议为int、string,且值中全部为标准UTF-8字符,不包含空格、".."、表情符、"/"、 "\"等字符,否则可能会产生数据写入异常风险。
支持常量。
说明分区配置可以对数据做合理分组,避免单路径下小文件过多造成不可控问题。
Connector 的吞吐能力和分区数正相关,无分区或分区少时 Connector 吞吐较弱,可能造成上游堆积问题。分区较多会导致数据分散、写入次数增多、碎片文件多等问题,因此分区的配置策略非常关键,因为以下为参考建议:
Kafka Source:可同时按时间和 partition 分区,当性能无法满足时,可通过提升 kafka partition 数量间接提升 Connector 吞吐,例如:prefix/{yyyy}/{MM}/{dd}/{HH}/{$.data.partition}/
业务分组:按照数据中的某个业务字段分区,此时吞吐速率取决于业务字段取值的数量,例如:prefixV2/{$.data.body.field}/
不同的任务建议配置不同的常量前缀,避免多个任务共享相同的分区,写入同一目录下造成数据混乱,无法区分。
alikafka_post-cn-9dhsaassdd****/guide-oss-sink-topic/YYYY/MM/dd/HH
时区配置
默认为UTC+8:00时区,此规则仅对时间分区配置有效。
UTC+8:00
批量聚合文件大小
配置需要聚合的文件大小,取值范围为[1,1024],单位:MB。
说明Connector 将数据分批写入到同一个 OSS Object 中,每一批数据大小为 (0 MB,16 MB],因此OSS Object最终大小可能会略大于配置值,最多超出16 MB。
在大流量场景下,建议聚合文件大小配置百MB级别,例如128MB、512MB,聚合时间窗口配置小时级别,如60 min、120 min。
5
批量聚合时间窗口
配置需要聚合的时间窗口。取值范围为[1,1440],单位:分钟。
1
文件压缩
无需压缩:生成的OSS Object无后缀名。
GZIP:生成后缀为.gz的 Object。
Snappy:生成后缀为.snappy的Object。
Zstd:生成后缀为.zstd的 Object。
当选择压缩后,Connector按压缩前数据大小进行攒批,因此OSS侧显示的Object大小会小于攒批大小,解压后大小接近攒批值。
无需压缩
文件内容
完整数据:Connector通过 CloudEvent协议包装了原始消息,完整数据表示包含CloudEvent协议后的数据。如下示例中,data字段内容为数据信息,其他字段为CloudEvent协议附加的Meta信息。
{ "specversion": "1.0", "id": "8e215af8-ca18-4249-8645-f96c1026****", "source": "acs:alikafka", "type": "alikafka:Topic:Message", "subject": "acs:alikafka:alikafka_pre-cn-i7m2msb9****:topic:****", "datacontenttype": "application/json; charset=utf-8", "time": "2022-06-23T02:49:51.589Z", "aliyunaccountid": "182572506381****", "data": { "topic": "****", "partition": 7, "offset": 25, "timestamp": 1655952591589, "headers": { "headers": [], "isReadOnly": false }, "key": "keytest", "value": "hello kafka msg" } }
数据提取:将JSONPath提取后的部分数据投递至OSS,比如配置$.data,则仅将data 字段的值投递到OSS。
如果对CloudEvent协议的附加字段无强需求,建议配置部分事件和$.data表达式,保证将Source的原始消息投递到OSS,降低存储成本,提升传输效率。
数据提取
$.data
任务属性
配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信。
返回任务列表页面,找到创建好的任务,在其右侧操作列,单击启用。
在提示对话框,阅读提示信息,然后单击确认。
启用任务后,会有30秒~60秒的延迟时间,您可以在任务列表页面的状态栏查看启动进度。
步骤三:测试OSS Sink Connector
在任务列表页面,在OSS Sink Connector任务的事件源列单击源Topic。
在Topic详情页面,单击体验发送消息。
在快速体验消息收发面板,按照下图配置消息内容,然后单击确定。
在任务列表页面,在OSS Sink Connector任务的事件目标列单击目标Bucket。
在Bucket页面,选择左侧导航栏的 。
tmp目录:Connector依赖的系统文件路径,请勿删除和使用该路径OSS Object。
数据文件目录:目录下按任务的分区路径规则生成子目录,并在最深层目录下上传数据文件。
在对应Object右侧操作列,选择 。
打开下载的文件,查看消息内容。
如图所示,多条消息之间通过换行分隔。