本文介绍如何创建使用MaxCompute Sink Connector,您可以通过MaxCompute Sink Connector将数据从云消息队列 Kafka 版实例的数据源Topic导出至MaxCompute的表中。
前提条件
详细步骤,请参见创建前提。
注意事项
如需使用MaxCompute分区功能,创建表时需额外创建一个分区列,列名为time,类型为STRING。
步骤一:创建目标资源
通过MaxCompute客户端创建表。更多信息,请参见创建表。
本文以名称为kafka_to_maxcompute的表为例。表中有3列数据,并使用分区功能。该表的建表语句如下:
CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT) PARTITIONED by (time STRING);
如不使用分区功能,语句如下:
CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT);
执行成功后,如下图所示:
在表管理页面,查看创建的表信息。
步骤二:创建MaxCompute Sink Connector并启动
登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
在左侧导航栏,选择 。
在任务列表页面,单击创建任务。
在创建任务面板,设置任务名称和描述,配置以下参数。
任务创建
在Source(源)配置向导,选择数据提供方为消息队列 Kafka 版,设置以下参数,然后单击下一步。
参数
说明
示例
地域
源Kafka实例所在的地域。
华东1(杭州)
kafka实例
数据源所在的Kafka实例ID。
alikafka_post-cn-9hdsbdhd****
Topic
数据源所在的Kafka实例Topic。
guide-sink-topic
Group ID
数据源所在的Kafka实例中的Group ID。
快速创建:自动创建以GID_EVENTBRIDGE_xxx命名的Group ID。
使用已有:选择已创建的Group,请选择独立的Group ID,不要和已有的业务混用,以免影响已有的消息收发。
使用已有
消费位点
最新位点:从最新位点开始消费。
最早位点:从最初位点开始消费。
最新位点
网络配置
有跨境传输数据需求时选择自建公网,其他情况可选择基础网络。
基础网络
数据格式
数据格式是针对支持二进制传递的数据源端推出的指定内容格式的编码能力。支持多种数据格式编码,如无特殊编码诉求可将格式设置为Json。
Json(默认Json格式编码,二进制数据按照utf-8 编码为Json格式放入Payload。)
Text(文本格式编码,二进制数据按照utf-8编码为字符串放入Payload。)
Binary(二进制格式编码,二进制数据按照Base64编码为字符串放入Payload。)
Json
批量推送条数
调用函数发送的最大批量消息条数,当积压的消息数量到达设定值时才会发送请求,取值范围为 [1,10000]。
2000
批量推送间隔(单位:秒)
调用函数的间隔时间,系统每到间隔时间点会将消息聚合后发给函数计算,取值范围为[0,15],单位为秒。0秒表示无等待时间,直接投递。
3
在Filtering(过滤)配置向导,定义数据模式过滤发送的请求。更多信息,请参见事件模式。
在Transform(转换)配置向导,设置数据清洗,实现分割、映射、富化及动态路由等繁杂数据加工能力。更多信息,请参见使用函数计算实现消息数据清洗。
在Sink(目标)配置向导,选择服务类型为大数据计算服务 acs.maxcompute,配置以下参数。
参数
说明
示例
账号AccessKey ID
阿里云账号的AccessKey ID,用于访问MaxCompute服务。
LTAI5tHPVCZywsoEVvFu****
账号AccessKey Secret
阿里云账号的AccessKey Secret。
4RAKUQpZtUntDgvoKu0tvrkrOM****
MaxCompute项目名称
选择已创建的MaxCompute项目。
test_compute
MaxCompute表名称
选择已创建的MaxCompute表。
kafka_to_maxcompute
MaxCompute表入参
选择MaxCompute表后,此处会展示表的列名和类型信息,配置数据提取规则即可。下面是一条消息示例,本示例中Topic列对应的值从Topic字段提取,则定义数据提取规则为
$.topic
。{ 'data': { 'topic': 't_test', 'partition': 2, 'offset': 1, 'timestamp': 1717048990499, 'headers': { 'headers': [], 'isReadOnly': False }, 'key': 'MaxCompute-K1', 'value': 'MaxCompute-V1' }, 'id': '9b05fc19-9838-4990-bb49-ddb942307d3f-2-1', 'source': 'acs:alikafka', 'specversion': '1.0', 'type': 'alikafka:Topic:Message', 'datacontenttype': 'application/json; charset=utf-8', 'time': '2024-05-30T06:03:10.499Z', 'aliyunaccountid': '1413397765616316' }
topic:
$.data.topic
valuename:
$.data.value
valueage:
$.data.offset
是否开启分区能力
开启:开启分区能力。
关闭:不开启分区能力。
开启
分区配置
仅当是否开启分区能力参数设置为开启时需配置此参数。
支持{yyyy}、{MM}、{dd}、{HH}、{mm}时间变量参数,分别对应年、月、日、时、分。时间变量大小写敏感。
支持填写常量。
{yyyy}-{MM}-{dd}.{HH}:{mm}.suffix
网络配置
专有网络:通过专有网络VPC将Kafka消息投递到MaxCompute。
公网:通过公网将Kafka消息投递到MaxCompute。
公网
VPC
选择VPC ID。仅当网络配置为专有网络时需要配置此参数。
vpc-bp17fapfdj0dwzjkd****
交换机
选择vSwitch ID。仅当网络配置为专有网络时需要配置此参数。
vsw-bp1gbjhj53hdjdkg****
安全组
选择安全组。仅当网络配置为专有网络时需要配置此参数。
test_group
任务属性
配置事件推送失败时的重试策略及错误发生时的处理方式。更多信息,请参见重试和死信。
完成上述配置后,单击保存。在任务列表页面,找到刚创建的MaxCompute Sink Connector任务,此时状态栏为启动中,当状态变为运行中时,Connector创建成功。
步骤三:测试MaxCompute Sink Connector
在任务列表页面,在MaxCompute Sink Connector任务的事件源列单击源Topic。
在Topic详情页面,单击体验发送消息。
在快速体验消息收发面板,按照下图配置消息内容,然后单击确定。
进入MaxCompute控制台,执行以下SQL语句查看分区信息。
show PARTITIONS kafka_to_maxcompute;
查询结果如下所示:
根据分区信息,执行以下语句,查看分区内数据。
SELECT * FROM kafka_to_maxcompute WHERE time="2024-05-31.16:37.suffix";
查询结果如下所示: