全部产品
Search
文档中心

云消息队列 Kafka 版:创建Tablestore Sink Connector

更新时间:Feb 17, 2025

本文介绍如何创建使用Tablestore Sink Connector,您可通过Tablestore Sink Connector将数据从云消息队列 Kafka 版实例的数据源Topic导出至Tablestore的表中。

前提条件

  • 已开通表格存储服务并创建实例。具体操作,请参见开通服务并创建实例

  • 已购买开通云消息队列 Kafka 版实例,并创建Topic资源。详细步骤,请参见购买Kafka实例创建资源

  • 创建Tablestore Sink Connector任务时生成的函数所在的服务角色需要手动添加AliyunOTSFullAccess权限策略,赋予此服务角色管理表格存储服务(OTS)的权限。具体操作, 请参见在RAM角色管理页面为RAM角色授权

步骤一:创建Tablestore表

创Tablestore表用于存储从云消息队列 Kafka 版流入的数据。具体操作,请参见操作步骤

本文以实例为ots-sink和数据表名为ots_sink_table的资源为例,创建数据表时设置topic、partition、offset三个主键。image

步骤二:创建Tablestore Sink Connector并启动

  1. 登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。

  2. 在左侧导航栏,选择Connector 生态集成 > 任务列表

  3. 任务列表页面,单击创建任务

  4. 创建任务页面,设置任务名称描述,配置以下信息,然后单击保存

    • 任务创建

      1. Source(源)配置向导,选择数据提供方消息队列 Kafka 版,设置以下参数,然后单击下一步

        参数

        说明

        示例

        地域

        选择云消息队列 Kafka 版源实例所在的地域。

        华北2(北京)

        kafka 实例

        选择生产云消息队列 Kafka 版消息的源实例。

        alikafka_post-cn-jte3****

        Topic

        选择生产云消息队列 Kafka 版消息的Topic。

        demo-topic

        Group ID

        选择源实例的消费组名称。

        • 快速创建:推荐方案,自动创建以GID_EVENTBRIDGE_xxx 命名的 Group ID。

        • 使用已有:请选择独立的Group ID,不要和已有的业务混用,以免影响已有的消息收发

        快速创建

        消费位点

        选择开始消费消息的位点。

        • 最新点位

        • 最早点位

        最新位点

        网络配置

        选择路由消息的网络类型。

        • 基础网络

        • 自建公网

        基础网络

        专有网络VPC

        选择VPC ID。当网络配置设置为自建公网时需要设置此参数。

        vpc-bp17fapfdj0dwzjkd****

        交换机

        选择vSwitch ID。当网络配置设置为自建公网时需要设置此参数。

        vsw-bp1gbjhj53hdjdkg****

        安全组

        选择安全组。当网络配置设置为自建公网时需要设置此参数。

        alikafka_pre-cn-7mz2****

        数据格式

        数据格式是针对支持二进制传递的数据源端推出的指定内容格式的编码能力。支持多种数据格式编码,如无特殊编码诉求可将格式设置为Json。

        • Json(Json格式编码,二进制数据按照utf-8 编码为Json格式放入Payload。)

        • Text(默认文本格式编码,二进制数据按照utf-8编码为字符串放入Payload。)

        • Binary(二进制格式编码,二进制数据按照Base64编码为字符串放入Payload。)

        Json

        批量推送条数

        调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。

        100

        批量推送间隔(单位:秒)

        调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。

        3

      2. Filtering(过滤)配置向导,设置数据模式内容过滤发送的请求。更多信息,请参见事件模式

      3. Transform(转换)配置向导,设置数据清洗,实现分割、映射、富化及动态路由等繁杂数据加工能力。更多信息,请参见使用函数计算实现消息数据清洗

      4. Sink(目标)配置向导,选择服务类型表格存储TableStore,配置以下参数。

        参数

        说明

        示例

        实例名称

        已创建的Tablestore实例名称。

        ost-sink

        目标表

        已创建的Tablestore数据表名称。

        ost_sink_table

        主键

        Tablestore主键值、属性列生成方式,需定义每个属性列内容的提取规则,提取规则通过JsonPath语法表达。当Source(源)数据格式配置为Json时,云消息队列 Kafka 版流出的数据格式如下所示:

        {
            "data": {
                "topic": "demo-topic",
                "partition": 0,
                "offset": 2,
                "timestamp": 1739756629123,
                "headers": {
                    "headers": [],
                    "isReadOnly": false
                },
                "key":"ots-sink-k1",
                "value": "ots-sink-v1"
            },
            "id": "7702ca16-f944-4b08-***-***-0-2",
            "source": "acs:alikafka",
            "specversion": "1.0",
            "type": "alikafka:Topic:Message",
            "datacontenttype": "application/json; charset=utf-8",
            "time": "2025-02-17T01:43:49.123Z",
            "subject": "acs:alikafka:alikafka_serverless-cn-lf6418u6701:topic:demo-topic",
            "aliyunaccountid": "1******6789"
        }

        例如,主键名称为topic,数值提取规则配置为$.data.topic

        属性列

        例如,属性列名称为key,数值提取规则配置为$.data.key

        操作模式

        数据写入Tablestore的方式。

        • put:当两条数据主键相同时,新数据会覆盖老数据。

        • update:当两条数据主键相同时,会在此行中写入增量列,不会删除存量列。

        • delete:删除相应的主键数据。

        put

        网络配置

        • 专有网络:通过专有网络VPC将Kafka消息投递到Tablestore。

        • 公网:通过公网将Kafka消息投递到Tablestore。

        专有网络

        VPC

        选择VPC ID。仅当网络配置专有网络时需配置此参数。

        vpc-bp17fapfdj0dwzjkd****

        交换机

        选择vSwitch ID。仅当网络配置专有网络时需配置此参数。

        vsw-bp1gbjhj53hdjdkg****

        安全组

        选择安全组。仅当网络配置专有网络时需配置此参数。

        test_group

    • 任务属性

      配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信

  5. 返回任务列表页面,找到创建好的任务,在其右侧操作列,单击启用

  6. 提示对话框,阅读提示信息,然后单击确认

    启用任务后,会有30秒~60秒的延迟时间,您可以在任务列表页面的状态栏查看启动进度。

步骤三:测试Tablestore Sink Connector

  1. 任务列表页面,在Tablestore Sink Connector任务的事件源列单击源Topic。

  2. 在Topic详情页面,单击体验发送消息

  3. 快速体验消息收发面板,按照下图配置消息内容,然后单击确定

    image

  4. 任务列表页面,在Tablestore Sink Connector任务的事件目标列单击目标表名。

  5. 表管理页面,单击数据管理页签,可以查看数据已经存储到Tablestore表中。

    image