雲訊息佇列 Kafka 版觸發器(以下簡稱Kafka觸發器)是通過事件匯流排EventBridge將雲訊息佇列 Kafka 版作為事件來源與Function Compute進行整合。建立完成後,您可以在Function Compute控制台和事件匯流排EventBridge控制台查看建立的資訊。當有訊息入隊時,事件匯流排EventBridge會根據您的攢批配置將一個或多個訊息事件以批的形式推送到函數中進行處理。
注意事項
作為觸發源的雲訊息佇列 Kafka 版執行個體必須和Function Compute的函數在相同的地區。
建立的事件流數量超過上限後,將無法再建立Kafka觸發器。關於事件流數量的限制,請參見使用限制。
前提條件
事件匯流排EventBridge
Function Compute
雲訊息佇列 Kafka 版
步驟一:建立Kafka觸發器
- 登入Function Compute控制台,在左側導覽列,單擊服務及函數。
- 在頂部功能表列,選擇地區,然後在服務列表頁面,單擊目標服務。
在函數管理頁面,單擊目標函數名稱。
在函數詳情頁面,單擊觸發器管理頁簽,從版本或別名下拉式清單選擇要建立觸發器的版本或別名,然後單擊建立觸發器。
在建立觸發程序面板,填寫相關資訊。然後單擊確定。
基礎配置項說明如下所示。
配置項
操作
本文樣本
觸發器類型
選擇訊息佇列 Kafka 版。
訊息佇列 Kafka 版
名稱
填寫自訂的觸發器名稱。
kafka-trigger
版本或別名
預設值為LATEST,如果您需要建立其他版本或別名的觸發器,首先需要在函數詳情頁的右上方切換到該版本或別名。關於版本和別名的簡介,請參見管理版本和管理別名。
LATEST
Kafka 執行個體
選擇已建立的雲訊息佇列 Kafka 版執行個體。
alikafka_pre-cn-i7m2t7t1****
Topic
選擇已建立的雲訊息佇列 Kafka 版執行個體的Topic。
topic1
Group ID
選擇已建立的雲訊息佇列 Kafka 版執行個體的Group ID。
說明請使用獨立的Group ID來建立觸發器,不要與已有的業務混用Group ID,否則會影響已有的訊息收發。
GID_group1
消費任務並發數
消費者的並發數量,取值範圍為[1,Topic的分區數]。
2
消費位點
選擇訊息的消費位點,即雲訊息佇列 Kafka 版從事件匯流排開始拉取訊息的位置。
取值說明如下。
最早位點:從最早位點開始消費。
最新位點:從最新位點開始消費。
最新位點
調用方式
選擇函數調用方式。
同步調用
投遞並發最大值
Kafka訊息投遞到Function Compute的並發最大值,取值範圍為1~300。該參數僅對同步調用生效。如果需要更高的並發,請進入EventBridge配額中心申請配額名稱為EventStreaming FC Sink 同步投遞最大並發數的配額。
1
觸發器啟用狀態
建立觸發器後是否立即啟用。預設勾選啟用觸發器,即建立觸發器後立即啟用觸發器。
不涉及
關於推送配置、重試和死信等進階配置項說明,請參見觸發器進階功能。
建立完成後,在觸發器名稱列表中顯示已建立的觸發器。如需對建立的觸發器進行修改或刪除,具體操作,請參見觸發器管理。
步驟二:配置函數入口參數
雲訊息佇列 Kafka 版事件來源會以event
的形式作為輸入參數傳遞給函數,您可以手動將event
傳給函數類比觸發事件。
在函數詳情頁面,單擊函數代碼頁簽,然後單擊測試函數右側表徵圖,從下拉式清單中,選擇配置測試參數。
在配置測試參數面板,選擇建立新測試事件或編輯已有測試事件頁簽,填寫事件名稱和事件內容。然後單擊確定。
event
格式如下所示:[ { "specversion":"1.0", "id":"8e215af8-ca18-4249-8645-f96c1026****", "source":"acs:alikafka", "type":"alikafka:Topic:Message", "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic", "datacontenttype":"application/json; charset=utf-8", "time":"2022-06-23T02:49:51.589Z", "aliyunaccountid":"164901546557****", "data":{ "topic":"****", "partition":7, "offset":25, "timestamp":1655952591589, "headers":{ "headers":[ ], "isReadOnly":false }, "key":"keytest", "value":"hello kafka msg" } }, { "specversion":"1.0", "id":"8e215af8-ca18-4249-8645-f96c1026****", "source":"acs:alikafka", "type":"alikafka:Topic:Message", "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic", "datacontenttype":"application/json; charset=utf-8", "time":"2022-06-23T02:49:51.589Z", "aliyunaccountid":"164901546557****", "data":{ "topic":"****", "partition":7, "offset":25, "timestamp":1655952591589, "headers":{ "headers":[ ], "isReadOnly":false }, "key":"keytest", "value":"hello kafka msg" } } ]
CloudEvents規範中定義的參數解釋,請參見事件概述。
data欄位包含的參數解釋如下表所示。
參數
類型
樣本值
描述
topic
String
TopicName
Topic的名稱。
partition
Int
1
雲訊息佇列 Kafka 版的消費分區資訊。
offset
Int
0
雲訊息佇列 Kafka 版的訊息位點。
timestamp
String
1655952591589
開始消費時間戳記。
步驟三:編寫函數代碼並測試
觸發器建立完成後,您可以開始編寫並測試函數代碼,以驗證代碼的正確性。在實際操作過程中,當雲訊息佇列 Kafka 版事件發生時,觸發器會自動觸發函數的執行。
在函數詳情頁面,單擊函數代碼頁簽,在代碼編輯器中編寫代碼,然後單擊部署代碼。
本文以Node.js函數代碼為例。
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); //解析event參數,對event進行處理。 callback(null, 'return result'); }
單擊函數代碼頁簽的測試函數。
執行完成後,您可以在函數字碼頁簽的上方查看執行結果。