Apache RocketMQ作为事件源通过事件总线EventBridge与函数计算集成后,通过Apache RocketMQ触发器能够触发关联函数执行,通过函数可以对发布到Apache RocketMQ的消息进行自定义处理。本文介绍如何在函数计算控制台创建Apache RocketMQ触发器、配置入口参数以及编写代码并测试。
背景信息
您在函数计算的控制台提交触发器创建请求后,函数计算会根据触发器的配置信息,自动在事件总线EventBridge侧创建事件流资源。
创建完成后,您可以在函数计算控制台查看触发器信息,同时也可以在事件总线EventBridge控制台查看自动创建的资源信息。当Apache RocketMQ中有消息入队时,将会触发函数计算执行,触发时会根据您的攒批配置将一个或多个消息事件以批的形式推送到函数中进行处理。
前提条件
事件总线EventBridge
函数计算
Apache RocketMQ
Apache RocketMQ 5.0:Quick Start。
Apache RocketMQ 4.x:Quick Start。
自建Apache RocketMQ集群
创建Topic
创建ConsumerGroup
您可以通过以下文档快速部署Apache RocketMQ集群,并完成消息的收发。
使用限制
作为触发源的Apache RocketMQ必须支持公网可访问或者阿里云VPC内可访问。
当Apache RocketMQ支持阿里云VPC内可访问时,VPC实例必须和函数计算的函数在相同的地域。
创建的事件流数量超过上限后,将无法再创建Apache RocketMQ触发器。关于事件流数量的限制,请参见使用限制。
步骤一:创建Apache RocketMQ触发器
登录函数计算控制台,在左侧导航栏,单击函数。
在顶部菜单栏,选择地域,然后在函数页面,单击目标函数。
在函数详情页面,选择配置页签,在左侧导航栏,单击触发器,然后单击创建触发器。
在创建触发器面板,填写相关信息,然后单击确定。
配置项
操作
本文示例
触发器类型
选择自建 Apache RocketMQ。
自建 Apache RocketMQ
名称
填写自定义的触发器名称。
apache-rocketmq-trigger
版本或别名
默认值为LATEST,如果您需要创建其他版本或别名的触发器,需要在函数详情页的右上角切换到该版本或别名。关于版本和别名的介绍,请参见版本管理和别名管理。
LATEST
接入点
填写集群NameServer地址。
192.168.X.X:9876
Topic
选择已创建的Apache RocketMQ实例的Topic。
testTopic
Group ID
选择已创建的Apache RocketMQ实例的Consumer Group ID。
testGroup
FilterType
选择消息过滤类型。取值说明如下:
Tag:通过Tag标签进行消息过滤。
SQL:通过SQL语句进行消息过滤,可匹配消息的属性标识以及属性值。
Tag
Filter
选择FilterType后,需要配置对应的过滤类型下的过滤语句。
TagA
认证模式
选择认证模式,支持ACL模式。
ACL
用户名
当认证模式选择ACL后,需要配置Apache RocketMQ用户名用于身份验证。
admin
密码
当认证模式选择ACL后,需要配置Apache RocketMQ密码用于身份验证。
******
消费位点
选择消息的消费位点,即Apache RocketMQ从事件总线开始拉取消息的位置。取值说明如下。
最新位点:从最新位点开始消费。
最早位点:从最早位点开始消费。
指定时间戳:从指定时间戳开始消费。
最新位点
网络配置
选择路由消息的网络类型。取值说明如下。
公网:通过公网访问Apache RocketMQ集群。
专有网络:通过阿里云专有网络访问Apache RocketMQ集群。您需要选择对应的专有网络VPC、交换机、安全组。
公网
调用方式
选择函数调用方式。
取值说明如下:
同步调用
触发器启用状态
创建触发器后是否立即启用。默认勾选启用触发器,即创建触发器后立即启用触发器。
启用触发器
关于推送配置、重试和死信等高级配置项说明,请参见触发器高级功能。
创建完成后,在触发器管理页签下会显示已创建的触发器。如果需要对创建的触发器进行修改或删除,请参见触发器管理。
步骤二:配置函数入口参数
自建Apache RocketMQ事件源会以event的形式作为输入参数传递给函数,您可以手动将event传给函数模拟触发事件。
在函数详情页面的代码页签,单击测试函数右侧的图标,从下拉列表中,选择配置测试参数。
在配置测试参数面板,选择创建新测试事件或编辑已有测试事件,填写事件名称和事件内容,然后单击确定。
event格式如下所示。
[ { "msgId": "7F0000010BDD2A84AEE70DA49B57****", "topic": "testTopic", "systemProperties": { "UNIQ_KEY": "7F0000010BDD2A84AEE70DA49B57****", "CLUSTER": "DefaultCluster", "MIN_OFFSET": "0", "TAGS": "TagA", "MAX_OFFSET": "128" }, "userProperties": {}, "body": "Hello RocketMQ" } ]
event字段包含的参数解释如下表所示。
参数
类型
示例值
描述
msgId
String
7F0000010BDD2A84AEE70DA49B57****
Apache RocketMQ消息 ID。
topic
String
testTopic
Topic名称。
systemProperties
Map
系统属性。
UNIQ_KEY
String
7F0000010BDD2A84AEE70DA49B57****
消息唯一键。
CLUSTER
String
DefaultCluster
Apache RocketMQ集群名称。
MIN_OFFSET
Int
0
最低位点。
MAX_OFFSET
Int
128
最高位点。
TAGS
String
TagA
过滤属性。
userProperties
Map
无
用户属性。
body
String
Hello RocketMQ
消息内容。
步骤三:编写函数代码并测试
触发器创建完成后,您可以开始编写并测试函数代码,以验证代码的正确性。在实际操作过程中,当Apache RocketMQ接收到消息后,触发器会自动触发函数执行。
在函数详情页面的代码页签,在代码编辑器中编写代码,然后单击部署代码。
本文以Node.js函数代码为例,示例代码如下。
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); // 解析event参数,对event进行处理。 callback(null, 'return result'); }
单击测试函数。
更多信息
除了函数计算控制台,您还可以通过以下方式配置触发器:
通过Serverless Devs工具配置触发器。更多操作,请参见Serverless Devs常用命令。
通过SDK配置触发器。更多操作,请参见SDK列表。
如需对创建的触发器进行修改或删除,具体操作,请参见触发器管理。