全部产品
Search
文档中心

云消息队列 Kafka 版:路由到对象存储OSS

更新时间:Dec 04, 2024

本文介绍如何创建使用OSS Sink Connector,您可以通过OSS Sink Connector将数据从云消息队列 Kafka 版的数据源Topic导出至对象存储OSS的Object中。

前提条件

详细步骤,请参见创建前提

注意事项

  1. Connector基于事件被处理的时间做时间分区,非事件的产生时间,如按时间分区,时间边界的数据可能被投递至下一个时间分区目录。

  2. 脏数据处理问题:如果在任务的自定义分区或文件内容中配置了JsonPath,但数据未命中JsonPath规则,Connector会将这类数据按攒批策略投递到Bucket下的invalidRuleData/路径。如发现Bucket下有此目录,请检查JsonPath规则的正确性,且避免消费端漏消费数据。

  3. 链路可能存在秒级到分钟级别内的延时。

  4. 如果自定义分区或文件内容中配置的JsonPath规则需对Kafka Source消息内容做提取,需在Kafka Source侧将内容编解码为Json格式。

  5. Connector实时将上游数据以Append追加方式写入OSS中,因此单个分区路径下,可见的最新文件通常处于写入中的状态,而非目标状态,请谨慎消费。

计费说明

Connector任务运行在阿里云函数计算平台,任务加工传输产生的计算资源将按函数计算单价计费,详情请参见计费概述

步骤一:创建目标服务资源

在对象存储OSS控制台创建一个存储空间(Bucket)。详细步骤,请参见控制台创建存储空间

本文以oss-sink-connector-bucket Bucket为例。

步骤二:创建OSS Sink Connector并启动

  1. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。

  2. 在左侧导航栏,选择Connector 生态集成 > 任务列表

  3. 任务列表页面,单击创建任务

    • 任务创建

      1. Source(源)配置向导,选择数据提供方消息队列 Kafka 版,设置以下参数,然后单击下一步

        参数

        说明

        示例

        地域

        选择云消息队列 Kafka 版源实例所在的地域。

        华北2(北京)

        kafka 实例

        选择生产云消息队列 Kafka 版消息的源实例。

        alikafka_post-cn-jte3****

        Topic

        选择生产云消息队列 Kafka 版消息的Topic。

        topic

        Group ID

        数据源所在的云消息队列 Kafka 版实例的Group ID。

        • 快速创建:新建一个Group ID资源,

        • 使用已有:填写一个已经的创建好的GroupID资源。

        GID_http_1

        消费位点

        选择开始消费消息的位点。

        最新位点

        网络配置

        选择路由消息的网络类型。

        基础网络

        专有网络VPC

        选择VPC ID。当网络配置设置为自建公网时需要设置此参数。

        vpc-bp17fapfdj0dwzjkd****

        交换机

        选择vSwitch ID。当网络配置设置为自建公网时需要设置此参数。

        vsw-bp1gbjhj53hdjdkg****

        安全组

        选择安全组。当网络配置设置为自建公网时需要设置此参数。

        alikafka_pre-cn-7mz2****

        批量推送条数

        调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。

        100

        批量推送间隔(单位:秒)

        调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。

        3

      2. Filtering(过滤)配置向导,设置数据模式内容过滤发送的请求。更多信息,请参见事件模式

      3. Transform(转换)配置向导,设置数据清洗,实现分割、映射、富化及动态路由等繁杂数据加工能力。更多信息,请参见数据清洗

      4. Sink(目标)配置向导,选择服务类型对象存储 OSS,配置以下参数,单击保存

        参数

        说明

        示例

        OSS Bucket

        已创建的对象存储OSS Bucket。

        重要
        • 保证填写的Bucket已手动创建完成,且在任务运行期间不被删除。

        • OSS存储类型请选择为标准存储、低频存储,暂不支持投递至归档存储Bucket。

        • 创建OSS Sink Connector任务后,平台将在此OSS Bucket一级目录下生成.tmp/系统文件路径,请勿删除和使用该路径下的OSS Object。

        oss-sink-connector-bucket

        保存路径

        OSS Object规则分Path和Name两部分,例如ObjectKey为a/b/c/a.txt时,Path为a/b/c/,Name为a.txt,其中自定义分区(即Path)可自定义。Name由Connector内部按固定规则生成:{毫秒级别unix 时间戳}_{8位随机字符串},例如:1705576353794_elJmxu3v。

        • 如未配置,或配置为 "/",表示无分区,数据将保存在Bucket一级目录下。

        • 支持时间变量参数,{yyyy}、{MM}、{dd}、{HH} ,分别代表年、月、天、小时,大小写敏感。

        • 支持JsonPath规则自定义OSS路径参数,例如: {$.data.topic}、{$.data.partition}。JsonPath变量需满足标准JsonPath表达式,受限于OSS路径规则,通过JsonPath提取值的类型建议为int、string,且值中全部为标准UTF-8字符,不包含空格、".."、表情符、"/"、 "\"等字符,否则可能会产生数据写入异常风险。

        • 支持常量。

          说明

          分区配置可以对数据做合理分组,避免单路径下小文件过多造成不可控问题。

          Connector 的吞吐能力和分区数正相关,无分区或分区少时 Connector 吞吐较弱,可能造成上游堆积问题。分区较多会导致数据分散、写入次数增多、碎片文件多等问题,因此分区的配置策略非常关键,因为以下为参考建议:

          • Kafka Source:可同时按时间和 partition 分区,当性能无法满足时,可通过提升 kafka partition 数量间接提升 Connector 吞吐,例如:prefix/{yyyy}/{MM}/{dd}/{HH}/{$.data.partition}/

          • 业务分组:按照数据中的某个业务字段分区,此时吞吐速率取决于业务字段取值的数量,例如:prefixV2/{$.data.body.field}/

          不同的任务建议配置不同的常量前缀,避免多个任务共享相同的分区,写入同一目录下造成数据混乱,无法区分。

        alikafka_post-cn-9dhsaassdd****/guide-oss-sink-topic/YYYY/MM/dd/HH

        时区配置

        默认为UTC+8:00时区,此规则仅对时间分区配置有效。

        UTC+8:00

        批量聚合文件大小

        配置需要聚合的文件大小,取值范围为[1,1024],单位:MB。

        说明
        • Connector 将数据分批写入到同一个 OSS Object 中,每一批数据大小为 (0 MB,16 MB],因此OSS Object最终大小可能会略大于配置值,最多超出16 MB。

        • 在大流量场景下,建议聚合文件大小配置百MB级别,例如128MB、512MB,聚合时间窗口配置小时级别,如60 min、120 min。

        5

        批量聚合时间窗口

        配置需要聚合的时间窗口。取值范围为[1,1440],单位:分钟。

        1

        文件压缩

        • 无需压缩:生成的OSS Object无后缀名。

        • GZIP:生成后缀为.gz的 Object。

        • Snappy:生成后缀为.snappy的Object。

        • Zstd:生成后缀为.zstd的 Object。

        当选择压缩后,Connector按压缩前数据大小进行攒批,因此OSS侧显示的Object大小会小于攒批大小,解压后大小接近攒批值。

        无需压缩

        文件内容

        • 完整数据:Connector通过 CloudEvent协议包装了原始消息,完整数据表示包含CloudEvent协议后的数据。如下示例中,data字段内容为数据信息,其他字段为CloudEvent协议附加的Meta信息。

          {
            "specversion": "1.0",
            "id": "8e215af8-ca18-4249-8645-f96c1026****",
            "source": "acs:alikafka",
            "type": "alikafka:Topic:Message",
            "subject": "acs:alikafka:alikafka_pre-cn-i7m2msb9****:topic:****",
            "datacontenttype": "application/json; charset=utf-8",
            "time": "2022-06-23T02:49:51.589Z",
            "aliyunaccountid": "182572506381****",
            "data": {
              "topic": "****",
              "partition": 7,
              "offset": 25,
              "timestamp": 1655952591589,
              "headers": {
                "headers": [],
                "isReadOnly": false
              },
              "key": "keytest",
              "value": "hello kafka msg"
            }
          }
        • 数据提取:将JSONPath提取后的部分数据投递至OSS,比如配置$.data,则仅将data 字段的值投递到OSS。

        如果对CloudEvent协议的附加字段无强需求,建议配置部分事件和$.data表达式,保证将Source的原始消息投递到OSS,降低存储成本,提升传输效率。

        数据提取

        $.data
    • 任务属性

      配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信

  4. 返回任务列表页面,找到创建好的任务,在其右侧操作列,单击启用

  5. 提示对话框,阅读提示信息,然后单击确认

    启用任务后,会有30秒~60秒的延迟时间,您可以在任务列表页面的状态栏查看启动进度。

步骤三:测试OSS Sink Connector

  1. 任务列表页面,在OSS Sink Connector任务的事件源列单击源Topic。

  2. 在Topic详情页面,单击体验发送消息

  3. 快速体验消息收发面板,按照下图配置消息内容,然后单击确定

    发送消息

  4. 任务列表页面,在OSS Sink Connector任务的事件目标列单击目标Bucket。

  5. 在Bucket页面,选择左侧导航栏的文件管理 > 文件列表

    • tmp目录:Connector依赖的系统文件路径,请勿删除和使用该路径OSS Object。

    • 数据文件目录:目录下按任务的分区路径规则生成子目录,并在最深层目录下上传数据文件。

    最深层路径

  6. 在对应Object右侧操作列,选择图标 > 下载

  7. 打开下载的文件,查看消息内容。

    消息

    如图所示,多条消息之间通过换行分隔。