Apache RocketMQ作為事件來源通過事件匯流排EventBridge與Function Compute整合後,通過Apache RocketMQ觸發器能夠觸發關聯函數執行,通過函數可以對發布到Apache RocketMQ的訊息進行自訂處理。本文介紹如何在Function Compute控制台建立Apache RocketMQ觸發器、配置入口參數以及編寫代碼並測試。
背景資訊
您在Function Compute的控制台提交觸發器建立請求後,Function Compute會根據觸發器的配置資訊,自動在事件匯流排EventBridge側建立事件流資源。
建立完成後,您可以在Function Compute控制台查看觸發器資訊,同時也可以在事件匯流排EventBridge控制台查看自動建立的資源資訊。當Apache RocketMQ中有訊息入隊時,將會觸發Function Compute執行,觸發時會根據您的攢批配置將一個或多個訊息事件以批的形式推送到函數中進行處理。
前提條件
事件匯流排EventBridge
Function Compute
Apache RocketMQ
Apache RocketMQ 5.0:Quick Start。
Apache RocketMQ 4.x:Quick Start。
自建Apache RocketMQ叢集
建立Topic
建立ConsumerGroup
您可以通過以下文檔快速部署Apache RocketMQ叢集,並完成訊息的收發。
使用限制
作為觸發源的Apache RocketMQ必須支援公網可訪問或者阿里雲VPC內可訪問。
當Apache RocketMQ支援阿里雲VPC內可訪問時,VPC執行個體必須和Function Compute的函數在相同的地區。
建立的事件流數量超過上限後,將無法再建立Apache RocketMQ觸發器。關於事件流數量的限制,請參見使用限制。
步驟一:建立Apache RocketMQ觸發器
登入Function Compute控制台,在左側導覽列,單擊函數。
在頂部功能表列,選擇地區,然後在函數頁面,單擊目標函數。
在函數詳情頁面,選擇配置頁簽,在左側導覽列,單擊觸發器,然後單擊建立觸發器。
在建立觸發程序面板,填寫相關資訊,然後單擊確定。
配置項
操作
本文樣本
觸發器類型
選擇自建 Apache RocketMQ。
自建 Apache RocketMQ
名稱
填寫自訂的觸發器名稱。
apache-rocketmq-trigger
版本或別名
預設值為LATEST,如果您需要建立其他版本或別名的觸發器,需要在函數詳情頁的右上方切換到該版本或別名。關於版本和別名的介紹,請參見版本管理和別名管理。
LATEST
存取點
填寫叢集NameServer地址。
192.168.X.X:9876
Topic
選擇已建立的Apache RocketMQ執行個體的Topic。
testTopic
Group ID
選擇已建立的Apache RocketMQ執行個體的Consumer Group ID。
testGroup
FilterType
選擇訊息過濾類型。取值說明如下:
Tag:通過Tag標籤進行訊息過濾。
SQL:通過SQL語句進行訊息過濾,可匹配訊息的屬性標識以及屬性值。
Tag
Filter
選擇FilterType後,需要配置對應的過濾類型下的過濾語句。
TagA
認證模式
選擇認證模式,支援ACL模式。
ACL
使用者名稱
當認證模式選擇ACL後,需要配置Apache RocketMQ使用者名稱用於身分識別驗證。
admin
密碼
當認證模式選擇ACL後,需要配置Apache RocketMQ密碼用於身分識別驗證。
******
消費位點
選擇訊息的消費位點,即Apache RocketMQ從事件匯流排開始拉取訊息的位置。取值說明如下。
最新位點:從最新位點開始消費。
最早位點:從最早位點開始消費。
指定時間戳記:從指定時間戳記開始消費。
最新位點
網路設定
選擇路由訊息的網路類型。取值說明如下。
公網:通過公網訪問Apache RocketMQ叢集。
專用網路:通過阿里雲專用網路訪問Apache RocketMQ叢集。您需要選擇對應的Virtual Private Cloud、交換器、安全性群組。
公網
調用方式
選擇函數調用方式。
取值說明如下:
同步調用
觸發器啟用狀態
建立觸發器後是否立即啟用。預設勾選啟用觸發器,即建立觸發器後立即啟用觸發器。
啟用觸發器
關於推送配置、重試和死信等進階配置項說明,請參見觸發器進階功能。
建立完成後,在觸發器管理頁簽下會顯示已建立的觸發器。如果需要對建立的觸發器進行修改或刪除,請參見觸發器管理。
步驟二:配置函數入口參數
自建Apache RocketMQ事件來源會以event的形式作為輸入參數傳遞給函數,您可以手動將event傳給函數類比觸發事件。
在函數詳情頁面的代碼頁簽,單擊測試函數右側的表徵圖,從下拉式清單中,選擇配置測試參數。
在配置測試參數面板,選擇建立新測試事件或編輯已有測試事件,填寫事件名稱和事件內容,然後單擊確定。
event格式如下所示。
[ { "msgId": "7F0000010BDD2A84AEE70DA49B57****", "topic": "testTopic", "systemProperties": { "UNIQ_KEY": "7F0000010BDD2A84AEE70DA49B57****", "CLUSTER": "DefaultCluster", "MIN_OFFSET": "0", "TAGS": "TagA", "MAX_OFFSET": "128" }, "userProperties": {}, "body": "Hello RocketMQ" } ]
event欄位包含的參數解釋如下表所示。
參數
類型
樣本值
描述
msgId
String
7F0000010BDD2A84AEE70DA49B57****
Apache RocketMQ訊息 ID。
topic
String
testTopic
Topic名稱。
systemProperties
Map
系統屬性。
UNIQ_KEY
String
7F0000010BDD2A84AEE70DA49B57****
訊息唯一鍵。
CLUSTER
String
DefaultCluster
Apache RocketMQ叢集名稱。
MIN_OFFSET
Int
0
最低位點。
MAX_OFFSET
Int
128
最高位點。
TAGS
String
TagA
過濾屬性。
userProperties
Map
無
使用者屬性。
body
String
Hello RocketMQ
訊息內容。
步驟三:編寫函數代碼並測試
觸發器建立完成後,您可以開始編寫並測試函數代碼,以驗證代碼的正確性。在實際操作過程中,當Apache RocketMQ接收到訊息後,觸發器會自動觸發函數執行。
在函數詳情頁面的代碼頁簽,在代碼編輯器中編寫代碼,然後單擊部署代碼。
本文以Node.js函數代碼為例,範例程式碼如下。
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); // 解析event參數,對event進行處理。 callback(null, 'return result'); }
單擊測試函數。
更多資訊
除了Function Compute控制台,您還可以通過以下方式配置觸發器:
通過Serverless Devs工具配置觸發器。更多操作,請參見Serverless Devs常用命令。
通過SDK配置觸發器。更多操作,請參見SDK列表。
如需對建立的觸發器進行修改或刪除,具體操作,請參見觸發器管理。