DTS(Data Transmission Service)作為事件來源通過事件匯流排EventBridge與Function Compute整合後,通過DTS觸發器能夠觸發關聯函數執行。通過函數可以對DTS資料訂閱擷取的資料庫即時增量資料進行自訂處理。本文介紹如何在Function Compute控制台建立DTS觸發器、配置入口參數以及編寫代碼並測試代碼。
功能簡介
您在Function Compute的控制台提交觸發器建立請求之後,Function Compute會根據觸發器的配置資訊,自動在事件匯流排EventBridge側建立事件流資源。
建立完成後,您可以在Function Compute控制台查看觸發器資訊,同時也可以在事件匯流排EventBridge控制台查看自動建立的資源資訊。當DTS資料訂閱捕捉到資料庫的增量資料後,將會觸發函數執行,觸發時會根據您的攢批配置將一個或多個訊息事件以批的形式推送到函數中進行處理。
注意事項
- 作為觸發源的DTS資料訂閱任務必須和Function Compute的函數在相同的地區。
- 建立的事件流數量超過上限後,將無法再建立DTS觸發器。關於事件流的數量限制,請參見使用限制。
前提條件
- 事件匯流排EventBridge
- Function Compute
- Data Transmission Service
步驟一:建立DTS觸發器
- 登入Function Compute控制台,在左側導覽列,單擊服務及函數。
- 在頂部功能表列,選擇地區,然後在服務列表頁面,單擊目標服務。
在函數管理頁面,單擊目標函數名稱。
在函數詳情頁面,單擊觸發器管理頁簽,從版本或別名下拉式清單選擇要建立觸發器的版本或別名,然後單擊建立觸發器。
- 在建立觸發程序面板,填寫相關資訊。然後單擊確定。基礎配置項說明如下所示。
配置項 取值說明 本文樣本 觸發器類型 觸發器類型。關於支援的觸發器類型,請參見觸發器簡介。 DTS 名稱 自訂的觸發器名稱。 dts-trigger 版本或別名 預設值為LATEST,如果您需要建立其他版本或別名的觸發器,請先在函數詳情頁的右上方切換到該版本或別名。關於版本和別名的簡介,請參見管理版本和管理別名。 LATEST 資料訂閱任務 已建立的資料訂閱任務名稱。 dtsqntc2*** 消費組 已建立的用於消費訂閱任務資料的消費組名稱。 重要 請確保該消費組沒有在其他用戶端的執行個體上運行,否則可能導致傳入的消費位點失效。test 帳號 建立消費組時設定的帳號。 test 密碼 建立消費組時設定的密碼。 ****** 消費位點 期望消費第一條資料的時間戳記。消費位點必須在訂閱執行個體的資料範圍之內。 說明 消費位點僅在新消費組第一次運行時生效。若後續任務重啟,會基於上次消費位點繼續消費。2022-06-21 00:00:00 調用方式 選擇函數調用方式。 同步調用 觸發器啟用狀態 建立觸發器後是否立即啟用。預設勾選啟用觸發器,即建立觸發器後立即啟用觸發器。 不涉及 關於推送配置、重試和死信等進階配置項說明,請參見觸發器進階功能。
建立完成後,在觸發器名稱列表中顯示已建立的觸發器。如需對建立的觸發器進行修改或刪除,具體操作,請參見觸發器管理。
步驟二:配置函數入口參數
DTS事件來源會以event
的形式作為輸入參數傳遞給函數,您可以手動將event
傳給函數類比觸發事件。
在函數詳情頁面,單擊函數代碼頁簽,然後單擊測試函數右側表徵圖,從下拉式清單中,選擇配置測試參數。
- 在配置測試參數面板,選擇建立新測試事件或編輯已有測試事件頁簽,填寫事件名稱和事件內容。然後單擊確定。
event
格式如下所示:[ { "data": { "id": 321****, "topicPartition": { "hash": 0, "partition": 0, "topic": "cn_hangzhou_rm_1234****_test_version2" }, "offset": 3218099, "sourceTimestamp": 1654847757, "operationType": "UPDATE", "schema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou--test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "beforeImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 104, 101, 108, 108, 111 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 9, "capacity": 9, "address": 0 }, "charset": "utf8mb4" } ], "size": 45 }, "afterImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 98, 121, 101 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 11, "capacity": 11, "address": 0 }, "charset": "utf8mb4" } ], "size": 47 } }, "id": "12f701a43741d404fa9a7be89d9acae0-321****", "source": "DTSstreamDemo", "specversion": "1.0", "type": "dts:ConsumeMessage", "datacontenttype": "application/json; charset=utf-8", "time": "2022-06-10T07:55:57Z", "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro" } ]
CloudEvents規範中定義的參數解釋,請參見事件概述。
data欄位包含的參數解釋如下表所示。
參數
類型
說明
id
String
DTS資料ID。
topicPartition
Array
Topic的分區資訊。
hash
String
DTS底層儲存參數。
partition
String
Topic的分區。
topic
String
Topic的名稱。
offset
Int
DTS資料對應的訊息儲存位點。
sourceTimestamp
Int
DTS資料產生時間戳記。
operationType
String
DTS資料的操作類型。
schema
Array
資料庫表結構資訊。
recordFields
Array
欄位詳情記錄。
fieldName
String
欄位名稱。
rawDataTypeNum
Int
欄位類型映射值。
該值對應從資料訂閱通道中擷取的增量資料還原序列化後的dataTypeNumber欄位值,詳情請參見使用Kafka用戶端消費訂閱資料。
isPrimaryKey
Boolean
欄位是否是主鍵。
isUniqueKey
Boolean
欄位是否是唯一值。
fieldPosition
String
欄位位置。
nameIndex
Array
命名索引。
schemaId
String
資料庫表結構資訊的ID。
databaseName
String
資料庫名稱。
tableName
String
資料表名稱。
primaryIndexInfo
String
主鍵索引。
indexType
String
主鍵索引類型。
indexFields
Array
主鍵索引欄位內容。
cardinality
String
主鍵基數。
nullable
Boolean
主鍵是否可為null。
isFirstUniqueIndex
Boolean
是否是第一個唯一索引。
uniqueIndexInfo
String
唯一索引。
foreignIndexInfo
String
外鍵索引。
normalIndexInfo
String
普通索引。
databaseInfo
Array
資料庫資訊。
databaseType
String
資料庫類型。
version
String
資料庫版本。
totalRows
Int
資料表的總行數。
beforeImage
String
操作前記錄欄位內容鏡像。
values
String
記錄欄位的值。
size
Int
記錄欄位大小。
afterImage
String
操作後記錄欄位內容鏡像。
步驟三:編寫函數代碼並測試
觸發器建立完成後,您可以開始編寫並測試函數代碼,以驗證代碼的正確性。在實際操作過程中,當DTS資料訂閱捕捉到資料庫的增量資料後,觸發器會自動觸發函數的執行。
- 在函數詳情頁面,單擊函數代碼頁簽,在代碼編輯器中編寫代碼,然後單擊部署代碼。本文以Node.js函數代碼為例。
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); //解析event參數,對event進行處理。 callback(null, 'return result'); }
單擊函數代碼頁簽的測試函數。
執行完成後,您可以在函數字碼頁簽的上方查看執行結果。
更多資訊
除了Function Compute控制台,您還可通過以下方式配置觸發器:
通過Serverless Devs工具配置觸發器。更多操作,請參見Serverless Devs。
通過SDK配置觸發器。更多操作,請參見SDK列表。
如需對建立的觸發器進行修改或刪除,具體操作,請參見觸發器管理。