本文说明如何创建MaxCompute Sink Connector将数据从云消息队列 Kafka 版实例的数据源Topic导出至MaxCompute的表。
前提条件
在创建MaxCompute Sink Connector前,请确保您已完成以下操作:
云消息队列 Kafka 版
为云消息队列 Kafka 版实例开启Connector。更多信息,请参见开启Connector。
为云消息队列 Kafka 版实例创建数据源Topic。更多信息,请参见步骤一:创建Topic。
本文以名称为maxcompute-test-input的Topic为例。
大数据计算服务(MaxCompute)
通过MaxCompute客户端创建表。更多信息,请参见创建表。
本文以名称为connector_test的项目下名称为test_kafka的表为例。该表的建表语句如下:
CREATE TABLE IF NOT EXISTS test_kafka(topic STRING,partition BIGINT,offset BIGINT,key STRING,value STRING) PARTITIONED by (pt STRING);
可选:事件总线EventBridge
说明仅在您创建的Connector任务所属实例的地域为华东1(杭州)或西南1(成都)时,需要完成该操作。
注意事项
仅支持在同地域内,将数据从云消息队列 Kafka 版实例的数据源Topic导出至MaxCompute。Connector的限制说明,请参见使用限制。
如果Connector所属实例的地域为华东1(杭州)或西南1(成都),该功能会部署至事件总线EventBridge。
事件总线EventBridge目前免费供您使用。更多信息,请参见计费说明。
创建Connector时,事件总线EventBridge会为您自动创建服务关联角色AliyunServiceRoleForEventBridgeSourceKafka和AliyunServiceRoleForEventBridgeConnectVPC。
如果未创建服务关联角色,事件总线EventBridge会为您自动创建对应的服务关联角色,以便允许事件总线EventBridge使用此角色访问云消息队列 Kafka 版和专有网络VPC。
如果已创建服务关联角色,事件总线EventBridge不会重复创建。
关于服务关联角色的更多信息,请参见服务关联角色。
部署到事件总线EventBridge的任务暂时不支持查看任务运行日志。Connector任务执行完成后,您可以在订阅数据源Topic的Group中,通过消费情况查看任务进度。具体操作,请参见查看消费状态。
操作流程
使用MaxCompute Sink Connector将数据从云消息队列 Kafka 版实例的数据源Topic导出至MaxCompute的表操作流程如下:
授予云消息队列 Kafka 版访问MaxCompute的权限。
可选:创建MaxCompute Sink Connector依赖的Topic和Group
如果您不需要自定义Topic和Group,您可以直接跳过该步骤,在下一步骤选择自动创建。
重要部分MaxCompute Sink Connector依赖的Topic的存储引擎必须为Local存储,大版本为0.10.2的云消息队列 Kafka 版实例不支持手动创建Local存储的Topic,只支持自动创建。
结果验证
创建RAM角色
由于RAM角色不支持直接选择云消息队列 Kafka 版作为受信服务,您在创建RAM角色时,需要选择任意支持的服务作为受信服务。RAM角色创建后,手动修改信任策略。
登录访问控制控制台。
在左侧导航栏,选择 。
在角色页面,单击创建角色。
在创建角色面板,执行以下操作。
选择可信实体类型为阿里云服务,然后单击下一步。
在角色类型区域,选择普通服务角色,在角色名称文本框,输入AliyunKafkaMaxComputeUser1,从选择受信服务列表,选择大数据计算服务,然后单击完成。
在角色页面,找到AliyunKafkaMaxComputeUser1,单击AliyunKafkaMaxComputeUser1。
在AliyunKafkaMaxComputeUser1页面,单击信任策略管理页签,单击修改信任策略。
在修改信任策略面板,将脚本中odps替换为alikafka,单击确定。
替换后的策略如下所示。
添加权限
为使Connector将消息同步到MaxCompute表,您需要为创建的RAM角色至少授予以下权限:
客体 | 操作 | 描述 |
Project | CreateInstance | 在项目中创建实例。 |
Table | Describe | 读取表的元信息。 |
Table | Alter | 修改表的元信息或添加删除分区。 |
Table | Update | 覆盖或添加表的数据。 |
关于以上权限的详细说明以及授权操作,请参见MaxCompute权限。
为本文创建的AliyunKafkaMaxComputeUser1添加权限的示例步骤如下:
登录MaxCompute客户端。
执行以下命令添加RAM角色为用户。
add user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
说明将<accountid>替换为您自己的阿里云账号ID。
为RAM用户授予访问MaxCompute所需的最小权限。
执行以下命令为RAM用户授予项目相关权限。
grant CreateInstance on project connector_test to user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
说明将<accountid>替换为您自己的阿里云账号ID。
执行以下命令为RAM用户授予表相关权限。
grant Describe, Alter, Update on table test_kafka to user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
说明将<accountid>替换为您自己的阿里云账号ID。
创建MaxCompute Sink Connector依赖的Topic
您可以在云消息队列 Kafka 版控制台手动创建MaxCompute Sink Connector依赖的5个Topic,包括:任务位点Topic、任务配置Topic、任务状态Topic、死信队列Topic以及异常数据Topic。每个Topic所需要满足的分区数与存储引擎会有差异,具体信息,请参见配置源服务参数列表。
在概览页面的资源分布区域,选择地域。
重要Topic需要在应用程序所在的地域(即所部署的ECS的所在地域)进行创建。Topic不能跨地域使用。例如Topic创建在华北2(北京)这个地域,那么消息生产端和消费端也必须运行在华北2(北京)的ECS。
在实例列表页面,单击目标实例名称。
在左侧导航栏,单击Topic 管理。
在Topic 管理页面,单击创建 Topic。
在创建 Topic面板,设置Topic属性,然后单击确定。
参数
说明
示例
名称
Topic名称。
demo
描述
Topic的简单描述。
demo test
分区数
Topic的分区数量。
12
存储引擎
说明当前仅专业版实例支持选择存储引擎类型,标准版暂不支持,默认选择为云存储类型。
Topic消息的存储引擎。
云消息队列 Kafka 版支持以下两种存储引擎。
云存储:底层接入阿里云云盘,具有低时延、高性能、持久性、高可靠等特点,采用分布式3副本机制。实例的规格类型为标准版(高写版)时,存储引擎只能为云存储。
Local 存储:使用原生Kafka的ISR复制算法,采用分布式3副本机制。
云存储
消息类型
Topic消息的类型。
普通消息:默认情况下,保证相同Key的消息分布在同一个分区中,且分区内消息按照发送顺序存储。集群中出现机器宕机时,可能会造成消息乱序。当存储引擎选择云存储时,默认选择普通消息。
分区顺序消息:默认情况下,保证相同Key的消息分布在同一个分区中,且分区内消息按照发送顺序存储。集群中出现机器宕机时,仍然保证分区内按照发送顺序存储。但是会出现部分分区发送消息失败,等到分区恢复后即可恢复正常。当存储引擎选择Local 存储时,默认选择分区顺序消息。
普通消息
日志清理策略
Topic日志的清理策略。
当存储引擎选择Local 存储(当前仅专业版实例支持选择存储引擎类型为Local存储,标准版暂不支持)时,需要配置日志清理策略。
云消息队列 Kafka 版支持以下两种日志清理策略。
Delete:默认的消息清理策略。在磁盘容量充足的情况下,保留在最长保留时间范围内的消息;在磁盘容量不足时(一般磁盘使用率超过85%视为不足),将提前删除旧消息,以保证服务可用性。
Compact:使用Kafka Log Compaction日志清理策略。Log Compaction清理策略保证相同Key的消息,最新的value值一定会被保留。主要适用于系统宕机后恢复状态,系统重启后重新加载缓存等场景。例如,在使用Kafka Connect或Confluent Schema Registry时,需要使用Kafka Compact Topic存储系统状态信息或配置信息。
重要Compact Topic一般只用在某些生态组件中,例如Kafka Connect或Confluent Schema Registry,其他情况的消息收发请勿为Topic设置该属性。具体信息,请参见云消息队列 Kafka 版Demo库。
Compact
标签
Topic的标签。
demo
创建完成后,在Topic 管理页面的列表中显示已创建的Topic。
创建MaxCompute Sink Connector依赖的Group
您可以在云消息队列 Kafka 版控制台手动创建MaxCompute Sink Connector数据同步任务使用的Group。该Group的名称必须为connect-任务名称,具体信息,请参见配置源服务参数列表。
在概览页面的资源分布区域,选择地域。
在实例列表页面,单击目标实例名称。
在左侧导航栏,单击Group 管理。
在Group 管理页面,单击创建 Group。
在创建 Group面板的Group ID文本框输入Group的名称,在描述文本框简要描述Group,并给Group添加标签,单击确定。
创建完成后,在Group 管理页面的列表中显示已创建的Group。
创建并部署MaxCompute Sink Connector
创建并部署用于将数据从云消息队列 Kafka 版同步至MaxCompute的MaxCompute Sink Connector。
在概览页面的资源分布区域,选择地域。
在左侧导航栏,单击Connector 任务列表。
在Connector 任务列表页面,从选择实例的下拉列表选择Connector所属的实例,然后单击创建 Connector。
在创建 Connector配置向导面页面,完成以下操作。
在配置基本信息页签,按需配置以下参数,然后单击下一步。
参数
描述
示例值
名称
Connector的名称。命名规则:
可以包含数字、小写英文字母和短划线(-),但不能以短划线(-)开头,长度限制为48个字符。
同一个云消息队列 Kafka 版实例内保持唯一。
Connector的数据同步任务必须使用名称为connect-任务名称的Group。如果您未手动创建该Group,系统将为您自动创建。
kafka-maxcompute-sink
实例
默认配置为实例的名称与实例ID。
demo alikafka_post-cn-st21p8vj****
在配置源服务页签,选择数据源为消息队列Kafka版,并配置以下参数,然后单击下一步。
说明如果您已创建好Topic和Group,那么请选择手动创建资源,并填写已创建的资源信息。否则,请选择自动创建资源。
表 1. 配置源服务参数列表
参数
描述
示例值
数据源 Topic
需要同步数据的Topic。
maxcompute-test-input
消费线程并发数
数据源Topic的消费线程并发数。默认值为6。取值说明如下:
1
2
3
6
12
6
消费初始位置
开始消费的位置。取值说明如下:
最早位点:从最初位点开始消费。
最近位点:从最新位点开始消费。
最早位点
VPC ID
数据同步任务所在的VPC。单击配置运行环境显示该参数。默认为云消息队列 Kafka 版实例所在的VPC,您无需填写。
vpc-bp1xpdnd3l***
vSwitch ID
数据同步任务所在的交换机。单击配置运行环境显示该参数。该交换机必须与云消息队列 Kafka 版实例处于同一VPC。默认为部署云消息队列 Kafka 版实例时填写的交换机。
vsw-bp1d2jgg81***
失败处理
消息发送失败后,是否继续订阅出现错误的Topic的分区。单击配置运行环境显示该参数。取值说明如下。
继续订阅:继续订阅出现错误的Topic的分区,并打印错误日志。
停止订阅:停止订阅出现错误的Topic的分区,并打印错误日志
说明如何查看日志,请参见Connector相关操作。
如何根据错误码查找解决方案,请参见错误码。
继续订阅
创建资源方式
选择创建Connector所依赖的Topic与Group的方式。单击配置运行环境显示该参数。
自动创建
手动创建
自动创建
Connector 消费组
Connector的数据同步任务使用的Group。单击配置运行环境显示该参数。该Groupp的名称必须为connect-任务名称。
connect-kafka-maxcompute-sink
任务位点 Topic
用于存储消费位点的Topic。单击配置运行环境显示该参数。
Topic名称:建议以connect-offset开头。
分区数:Topic的分区数量必须大于1。
存储引擎:Topic的存储引擎必须为Local存储。
cleanup.policy:Topic的日志清理策略必须为compact。
connect-offset-kafka-maxcompute-sink
任务配置 Topic
用于存储任务配置的Topic。单击配置运行环境显示该参数。
Topic名称:建议以connect-config开头。
分区数:Topic的分区数量必须为1。
存储引擎:Topic的存储引擎必须为Local存储。
cleanup.policy:Topic的日志清理策略必须为compact。
connect-config-kafka-maxcompute-sink
任务状态 Topic
用于存储任务状态的Topic。单击配置运行环境显示该参数。
Topic名称:建议以connect-status开头。
分区数:Topic的分区数量建议为6。
存储引擎:Topic的存储引擎必须为Local存储。
cleanup.policy:Topic的日志清理策略必须为compact。
connect-status-kafka-maxcompute-sink
死信队列 Topic
用于存储Connect框架的异常数据的Topic。单击配置运行环境显示该参数。该Topic可以和异常数据Topic为同一个Topic,以节省Topic资源。
Topic名称:建议以connect-error开头。
分区数:Topic的分区数量建议为6。
存储引擎:Topic的存储引擎可以为Local存储或云存储。
connect-error-kafka-maxcompute-sink
异常数据 Topic
用于存储Sink的异常数据的Topic。单击配置运行环境显示该参数。该Topic可以和死信队列Topic为同一个Topic,以节省Topic资源。
Topic名称:建议以connect-error开头。
分区数:Topic的分区数量建议为6。
存储引擎:Topic的存储引擎可以为Local存储或云存储。
connect-error-kafka-maxcompute-sink
在配置目标服务页签,选择目标服务为大数据计算服务,并配置以下参数,然后单击创建。
说明如果Connector所属实例的地域为华东1(杭州)或西南1(成都),选择目标服务为大数据计算服务时, 会分别弹出创建服务关联角色AliyunServiceRoleForEventBridgeSourceKafka和AliyunServiceRoleForEventBridgeConnectVPC的服务授权对话框,在弹出的服务授权对话框中单击确认,然后再配置以下参数并单击创建。如果服务关联角色已创建,则不再重复创建,即不会再弹出服务授权对话框。
参数
描述
示例值
连接地址
MaxCompute的服务接入点。更多信息,请参见Endpoint。
VPC网络Endpoint:低延迟,推荐。适用于云消息队列 Kafka 版实例和MaxCompute处于同一地域场景。
外网Endpoint:高延迟,不推荐。适用于云消息队列 Kafka 版实例和MaxCompute处于不同地域的场景。如需使用公网Endpoint,您需要为Connector开启公网访问。更多信息,请参见为Connector开启公网访问。
http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api
工作空间
MaxCompute的工作空间。
connector_test
表
MaxCompute的表。
test_kafka
表地域
MaxCompute表所在地域。
华东1(杭州)
服务账号
MaxCompute的阿里云账号ID。
188***
授权角色名
云消息队列 Kafka 版的RAM角色的名称。更多信息,请参见创建RAM角色。
AliyunKafkaMaxComputeUser1
模式
消息同步到Connector的模式。默认为DEFAULT。取值说明如下:
KEY:只保留消息的Key,并将Key写入MaxCompute表的key列。
VALUE:只保留消息的Value,并将Value写入MaxCompute表的value列。
DEFAULT:同时保留消息的Key和Value,并将Key和Value分别写入MaxCompute表的key列和value列。
重要DEFAULT模式下,不支持选择CSV格式,只支持TEXT格式和BINARY格式。
DEFAULT
格式
消息同步到Connector的格式。默认为TEXT。取值说明如下:
TEXT:消息的格式为字符串。
BINARY:消息的格式为字节数组。
CSV:消息的格式为逗号(,)分隔的字符串。
重要CSV格式下,不支持DEFAULT模式,只支持KEY模式和VALUE模式:
KEY模式:只保留消息的Key,根据逗号(,)分隔Key字符串,并将分隔后的字符串按照索引顺序写入表。
VALUE模式:只保留消息的Value,根据逗号(,)分隔Value字符串,并将分隔后的字符串按照索引顺序写入表。
TEXT
分区
分区的粒度。默认为HOUR。取值说明如下:
DAY:每天将数据写入一个新分区。
HOUR:每小时将数据写入一个新分区。
MINUTE:每分钟将数据写入一个新分区。
HOUR
时区
向Connector的数据源Topic发送消息的云消息队列 Kafka 版生产者客户端所在时区。默认为GMT+08:00。
GMT 08:00
创建完成后,在Connector 任务列表页面,查看创建的Connector 。
创建完成后,在Connector 任务列表页面,找到创建的Connector ,单击其操作列的部署。
发送测试消息
部署MaxCompute Sink Connector后,您可以向云消息队列 Kafka 版的数据源Topic发送消息,测试数据能否被同步至MaxCompute。
在Connector 任务列表页面,找到目标Connector,在其右侧操作列,单击测试。
在发送消息面板,发送测试消息。
发送方式选择控制台。
在消息 Key文本框中输入消息的Key值,例如demo。
在消息内容文本框输入测试的消息内容,例如 {"key": "test"}。
设置发送到指定分区,选择是否指定分区。
单击是,在分区 ID文本框中输入分区的ID,例如0。如果您需查询分区的ID,请参见查看分区状态。
单击否,不指定分区。
发送方式选择Docker,执行运行 Docker 容器生产示例消息区域的Docker命令,发送消息。
发送方式选择SDK,根据您的业务需求,选择需要的语言或者框架的SDK以及接入方式,通过SDK发送消息。
查看表数据
向云消息队列 Kafka 版的数据源Topic发送消息后,在MaxCompute客户端查看表数据,验证是否收到消息。
查看本文写入的test_kafka的示例步骤如下:
登录MaxCompute客户端。
执行以下命令查看表的数据分区。
show partitions test_kafka;
返回结果示例如下:
pt=11-17-2020 15 OK
执行以下命令查看分区的数据。
select * from test_kafka where pt ="11-17-2020 14";
返回结果示例如下:
+----------------------+------------+------------+-----+-------+---------------+ | topic | partition | offset | key | value | pt | +----------------------+------------+------------+-----+-------+---------------+ | maxcompute-test-input| 0 | 0 | 1 | 1 | 11-17-2020 14 | +----------------------+------------+------------+-----+-------+---------------+