当您希望对云消息队列 Kafka 版实例中的数据进行清洗、转换并转存,可以通过执行数据处理任务实现。本文介绍如何使用 云消息队列 Kafka 版数据处理任务将源Topic中的数据处理后发送到目标Topic。
前提条件
在使用前,请确保您已完成以下操作:
为云消息队列 Kafka 版实例创建数据源Topic和目标Topic。具体操作,请参见步骤一:创建Topic。
说明如果需要手动创建数据处理任务所依赖的Topic,也需创建存储数据处理任务相关信息的Topic。关于创建Topic时参数配置说明,请参见配置数据源和目标。
开通函数计算服务。更多信息,请参见 开通函数计算服务 。
如果登录账号是RAM账号,需给RAM账号授权。具体操作,请参见 RAM账号授权 。
对应权限策略脚本示例如下:
{ "Version": "1", "Statement": [ { "Action": [ "kafka:CreateETLTask", "kafka:ListETLTask", "kafka:DeleteETLTask" ], "Resource": "*", "Effect": "Allow" } ] }
背景信息
数据处理是将数据从来源端经过抽取、转换、加载至目的端的过程。您可以编写一个函数, 云消息队列 Kafka 版将会使用这个函数将源Topic中的数据进行处理后,发送到目标Topic中。
函数处理过程中函数计算会自动创建对应的服务和函数,服务的名称自动在数据处理任务名称加前缀
_FC-kafka
。仅支持在同地域内,将数据从 云消息队列 Kafka 版实例的数据源Topic经过函数计算转化后发送至目标Topic。
函数计算的函数调用支持日志查询,以便您迅速排查问题。具体操作步骤,请参见 配置日志。
云消息队列 Kafka 版的数据处理任务处于公测阶段,且独立于 云消息队列 Kafka 版实例,因此不会在 云消息队列 Kafka 版侧产生费用。如在使用数据处理任务时依赖其他产品,费用说明请以对应产品为准。
开启数据处理
新建数据处理任务入口已变更至Connector 生态集成,Connector 生态集成提供数据过滤、转换能力。更多信息,请参见Connector概述。
首次使用云消息队列 Kafka 版数据处理任务功能前,需先进行服务授权。授权后,系统会自动创建服务关联角色AliyunServiceRoleForAlikafkaETL。通过该角色, 云消息队列 Kafka 版可以获取与数据处理相关的产品的访问权限。更多信息,请参见服务关联角色。
登录云消息队列 Kafka 版控制台,在概览页面的资源分布区域,选择地域。
在左侧导航栏,选择 。
在数据处理任务列表页面,单击创建任务。
在弹出的服务授权对话框中,单击确认。
创建数据处理任务
创建并部署数据处理任务用于将数据从源Topic经过处理后发送至目标Topic。
在数据处理任务列表页面,单击 创建任务。
在配置基本信息页面,输入数据处理任务名称,单击 下一步。
在配置数据源和目标页面,配置数据源、目标Topic和消费信息。配置完成后单击 下一步。
参数
说明
示例值
实例
数据源Topic和目标Topic所属的实例。
alikafka_pre-cn-7pp2bz47****
alikafka_post-cn-2r42bvbm****
Topic
数据源Topic和目标Topic。
说明数据源Topic和目标Topic不能相同。
topic****
test****
消费初始位置
数据消费的开始时间位点。单击 高级选项显示该参数。默认取值说明如下:
最早位点:从最初位点开始消费。
最近位点:从最新位点开始消费。
最近位点
失败处理
消息发送失败后,是否继续发送后续数据。单击 高级选项显示该参数。取值说明如下:
继续订阅:消息发送失败后,继续发送后续数据。
停止订阅:消息发送失败后,停止发送后续数据。
继续订阅
创建资源方式
选择创建数据处理任务所依赖的Topic的方式。单击 高级选项显示该参数。
自动创建
手动创建
自动创建
数据处理任务消费组
数据处理任务使用的 Group。如果选择 创建资源方式为 手动创建,显示该参数。该 Group的名称建议以etl-cluster开头。
etl-cluster-kafka
任务位点 Topic
用于存储消费位点的Topic。如果选择 创建资源方式为 手动创建,显示该参数。
Topic名称:建议以etl-offset开头。
分区数:Topic的分区数量必须大于1。
存储引擎:Topic的存储引擎必须为Local存储。
说明当前仅专业版实例支持在创建Topic时选择存储类型为Local存储,标准版暂不支持。
cleanup.policy:Topic的日志清理策略必须为compact。
etl-offset-kafka
任务配置 Topic
用于存储任务配置的Topic。如果选择 创建资源方式为 手动创建,显示该参数。
Topic名称:建议以etl-config开头。
分区数:Topic的分区数量必须为1。
存储引擎:Topic的存储引擎必须为Local存储。
说明当前仅专业版实例支持在创建Topic时选择存储类型为Local存储,标准版暂不支持。
cleanup.policy:Topic的日志清理策略必须为compact。
etl-config-kafka
任务状态 Topic
用于存储任务状态的Topic。如果选择 创建资源方式为 手动创建,显示该参数。
Topic:建议以etl-status开头。
分区数:Topic的分区数量建议为6。
存储引擎:Topic的存储引擎必须为Local存储。
说明当前仅专业版实例支持在创建Topic时选择存储类型为Local存储,标准版暂不支持。
cleanup.policy:Topic的日志清理策略必须为compact。
etl-status-kafka
死信队列 Topic
用于存储数据处理框架的异常数据的Topic。如果选择 创建资源方式为 手动创建,显示该参数。该Topic可以和 异常数据Topic 为同一个Topic,以节省Topic资源。
Topic:建议以etl-error开头。
分区数:Topic的分区数量建议为6。
存储引擎:Topic的存储引擎可以为Local存储或云存储。
说明当前仅专业版实例支持在创建Topic时选择存储类型为Local存储,标准版暂不支持。
etl-error-kafka
异常数据 Topic
用于存储Sink的异常数据的Topic。如果选择 创建资源方式为 手动创建,显示该参数。该Topic可以和 死信队列Topic 为同一个Topic,以节省Topic资源。
Topic:建议以etl-error开头。
分区数:Topic的分区数量建议为6。
存储引擎:Topic的存储引擎可以为Local存储或云存储。
说明当前仅专业版实例支持在创建Topic时选择存储类型为Local存储,标准版暂不支持。
etl-error-kafka
在配置处理函数页面,配置函数信息,并单击 创建。
单击创建前,您可以单击 测试,测试编写的处理函数是否符合预期。
参数
说明
示例
函数语言
处理函数的语言。仅支持Python3。
Python3
函数模版
系统提供的处理函数模板。选择一种模板,默认提供对应的函数代码。
添加前/后缀
函数代码
处理消息的代码。提供数据清洗和转换的模板,并提供对应的函数代码,您可以根据需要修改和编辑代码内容。
说明您可以根据需要import python部分模块。
代码中message为字典格式,您修改key和value即可,其他不需要修改。
将处理完的message返回。如果要是对消息的过滤,返回None。
def deal_message(message): for keyItem in message.keys(): if (keyItem == 'key'): message[keyItem] = message[keyItem] + "KeySurfix" continue if (keyItem == 'value'): message[keyItem] = message[keyItem] + "ValueSurfix" continue return message
消息 Key
源Topic的消息处理Key值。单击 测试代码显示该参数。
demo
消息内容
源Topic的消息处理Value值。
{"key": "test"}
创建完成后,在数据处理任务列表页面,查看创建的数据处理任务。创建成功后,系统自动部署。
发送测试消息
部署数据处理任务后,您可以向云消息队列 Kafka 版的数据源Topic发送消息,测试数据能否根据函数代码处理后发送至目标Topic。
在数据处理任务列表页面,目标数据处理任务所在行操作列,单击测试。
在发送消息面板,填写如下信息,并单击确定,发送测试消息。
在消息 Key文本框中输入消息Key值,例如demo。
在消息内容文本框中输入测试的消息内容,例如{"key": "test"}。
设置 发送到指定分区,选择是否指定分区。
单击是,在 分区 ID文本框中输入分区的ID,例如0。如果您需要查询分区的ID,请参见 查看分区状态。
单击否,不指定分区。
查看函数日志
从云消息队列 Kafka 版源Topic获取数据,经过函数处理发送至目标Topic后,查看函数日志,验证是否收到消息。更多信息,请参见 配置日志。
查看数据处理任务详情
成功创建数据处理任务后,您可以在云消息队列 Kafka 版控制台查看详细信息。
在数据处理任务列表页面,目标数据处理任务所在行操作列,单击详情。
在任务详情页面,查看任务详细信息。
删除数据处理任务
如果您不再需要某个数据处理任务,或者某个数据处理任务不再使用,您可以在云消息队列 Kafka 版控制台删除该数据处理任务。
在数据处理任务列表页面,目标数据处理任务所在行操作列,单击 删除。
在提示对话框中,单击确认。