訊息佇列 Kafka 版作為事件來源通過事件匯流排EventBridge與Function Compute整合後,通過訊息佇列 Kafka 版觸發器(以下簡稱Kafka觸發器)能夠觸發關聯函數執行,通過函數可以對發布到訊息佇列 Kafka 版的訊息進行自訂處理。本文介紹如何在Function Compute控制台建立Kafka觸發器、配置入口參數以及編寫代碼並測試代碼。
功能簡介
您在Function Compute的控制台提交觸發器建立請求之後,Function Compute會根據觸發器的配置資訊,自動在事件匯流排EventBridge側建立事件流資源。
建立完成後,您可以在Function Compute控制台查看觸發器資訊,同時也可以在事件匯流排EventBridge控制台查看自動建立的資源資訊。當源訊息佇列 Kafka 版中有訊息入隊時,將會觸發Function Compute執行,觸發時會根據您的攢批配置將一個或多個訊息事件以批的形式推送到函數中進行處理。
注意事項
作為觸發源的訊息佇列 Kafka 版執行個體必須和Function Compute的函數在相同的地區。
建立的事件流數量超過上限後,將無法再建立Kafka觸發器。關於事件流數量的限制,請參見使用限制。
前提條件
事件匯流排EventBridge
Function Compute
訊息佇列 Kafka 版
步驟一:建立Kafka觸發器
當您已經建立好Kafka的執行個體,您需要登入Function Compute控制台,進入目標函數,選擇配置頁簽,建立觸發器,建立完成點擊確定如下圖所示。
基礎配置項說明如下所示。
配置項 | 操作 | 本文樣本 |
消費位點 | 選擇訊息的消費位點,即訊息佇列 Kafka 版從事件匯流排開始拉取訊息的位置。 取值說明如下。
| 最早位點 |
調用方式 | 選擇函數調用方式。 | 同步調用 |
投遞並發最大值 | Kafka訊息投遞到Function Compute的並發最大值,取值範圍為1~300。該參數僅對同步調用生效。如果需要更高的並發,請進入EventBridge配額中心申請配額名稱為EventStreaming FC Sink 同步投遞最大並發數的配額。 | 1 |
關於推送配置、重試和死信等進階配置項說明,請參見觸發器進階功能。
步驟二:(可選)配置函數入口參數
訊息佇列 Kafka 版事件來源會以event
的形式作為輸入參數傳遞給函數,您可以使用代碼解析event參數,並對event進行處理。您可以手動將event
傳給函數類比觸發事件。
在函數詳情頁面的代碼頁簽,單擊測試函數右側的表徵圖,從下拉式清單中,選擇配置測試參數。
在配置測試參數面板,選擇建立新測試事件或編輯已有測試事件,填寫事件名稱和事件內容,然後單擊確定。
event
格式如下所示:[ { "specversion":"1.0", "id":"8e215af8-ca18-4249-8645-f96c1026****", "source":"acs:alikafka", "type":"alikafka:Topic:Message", "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic", "datacontenttype":"application/json; charset=utf-8", "time":"2022-06-23T02:49:51.589Z", "aliyunaccountid":"164901546557****", "data":{ "topic":"****", "partition":7, "offset":25, "timestamp":1655952591589, "headers":{ "headers":[ ], "isReadOnly":false }, "key":"keytest", "value":"hello kafka msg" } }, { "specversion":"1.0", "id":"8e215af8-ca18-4249-8645-f96c1026****", "source":"acs:alikafka", "type":"alikafka:Topic:Message", "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic", "datacontenttype":"application/json; charset=utf-8", "time":"2022-06-23T02:49:51.589Z", "aliyunaccountid":"164901546557****", "data":{ "topic":"****", "partition":7, "offset":25, "timestamp":1655952591589, "headers":{ "headers":[ ], "isReadOnly":false }, "key":"keytest", "value":"hello kafka msg" } } ]
CloudEvents規範中定義的參數解釋,請參見事件概述。
data欄位包含的參數解釋如下表所示。
參數
類型
樣本值
描述
topic
String
TopicName
Topic的名稱。
partition
Int
1
雲訊息佇列 Kafka 版的消費分區資訊。
offset
Int
0
雲訊息佇列 Kafka 版的訊息位點。
timestamp
String
1655952591589
開始消費時間戳記。
步驟三:編寫函數代碼並測試
觸發器建立完成後,您可以開始編寫並測試函數代碼,以驗證代碼的正確性。在實際操作過程中,當訊息佇列 Kafka 版事件發生時,觸發器會自動觸發函數的執行。
在函數詳情頁面的代碼頁簽,在代碼編輯器中編寫代碼,然後單擊部署代碼。
本文以Node.js函數代碼為例。
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); // 解析event參數,對event進行處理。 callback(null, 'return result'); }
測試函數。
方式一:如果您是配置函數入口參數
event
類比事件來源,單擊測試函數。方式二:登入雲訊息佇列 Kafka 版控制台,選擇您建立的目標Topic,點擊發送訊息,如下圖。
執行完成後,在即時日誌查看結果。
更多資訊
如需對建立的觸發器進行修改或刪除,具體操作,請參見觸發器管理。