全部產品
Search
文件中心

Function Compute:DTS觸發器

更新時間:Jul 06, 2024

DTS(Data Transmission Service)作為事件來源通過事件匯流排EventBridge與Function Compute整合後,通過DTS觸發器能夠觸發關聯函數執行。通過函數可以對DTS資料訂閱擷取的資料庫即時增量資料進行自訂處理。本文介紹如何在Function Compute控制台建立DTS觸發器、配置入口參數以及編寫代碼並測試代碼。

功能簡介

您在Function Compute的控制台提交觸發器建立請求之後,Function Compute會根據觸發器的配置資訊,自動在事件匯流排EventBridge側建立事件流資源。

建立完成後,您可以在Function Compute控制台查看觸發器資訊,同時也可以在事件匯流排EventBridge控制台查看自動建立的資源資訊。當DTS資料訂閱捕捉到資料庫的增量資料後,將會觸發函數執行,觸發時會根據您的攢批配置將一個或多個訊息事件以批的形式推送到函數中進行處理。

注意事項

  • 作為觸發源的DTS資料訂閱任務必須和Function Compute的函數在相同的地區。

  • 建立的事件流數量超過上限後,將無法再建立DTS觸發器。關於事件流的數量限制,請參見使用限制

前提條件

步驟一:建立DTS觸發器

  1. 登入Function Compute控制台,在左側導覽列,單擊函數

  2. 在頂部功能表列,選擇地區,然後在函數頁面,單擊目標函數。

  3. 在函數詳情頁面,選擇配置頁簽,在左側導覽列,單擊觸發器,然後單擊建立觸發器

  4. 在建立觸發程序面板,填寫相關資訊,然後單擊確定

    基礎配置項說明如下所示。

    配置項

    取值說明

    本文樣本

    觸發器類型

    觸發器類型。關於支援的觸發器類型,請參見觸發器簡介

    DTS

    名稱

    自訂的觸發器名稱。

    dts-trigger

    版本或別名

    預設值為LATEST,如果您需要建立其他版本或別名的觸發器,請先在函數詳情頁的右上方切換到該版本或別名。關於版本和別名的簡介,請參見版本管理別名管理

    LATEST

    資料訂閱任務

    已建立的資料訂閱任務名稱。

    dtsqntc2***

    消費組

    已建立的用於消費訂閱任務資料的消費組名稱。

    重要

    請確保該消費組沒有在其他用戶端的執行個體上運行,否則可能導致傳入的消費位點失效。

    test

    帳號

    建立消費組時設定的帳號。

    test

    密碼

    建立消費組時設定的密碼。

    ******

    消費位點

    期望消費第一條資料的時間戳記。消費位點必須在訂閱執行個體的資料範圍之內。

    說明

    消費位點僅在新消費組第一次運行時生效。若後續任務重啟,會基於上次消費位點繼續消費。

    2022-06-21 00:00:00

    調用方式

    選擇函數調用方式。

    取值說明如下。

    • 同步調用:適用於順序調用情境。單個(批)事件觸發函數調用,等待函數執行完成返回結果後,再由下一個(批)事件繼續觸發函數調用。同步調用請求本文承載最大為32 MB。更多資訊,請參見同步調用

    • 非同步呼叫:可以快速消費事件。單個(批)事件觸發函數調用,Function Compute會立刻返迴響應,再由下一個(批)事件繼續觸發函數調用。該過程中函數會非同步執行。非同步呼叫請求本文承載最大為128 KB。更多資訊,請參見功能概覽

    同步調用

    觸發器啟用狀態

    建立觸發器後是否立即啟用。預設勾選啟用觸發器,即建立觸發器後立即啟用觸發器。

    不涉及

    關於推送配置、重試和死信等進階配置項說明,請參見觸發器進階功能

    建立完成後,在觸發器名稱列表中顯示已建立的觸發器。如需對建立的觸發器進行修改或刪除,具體操作,請參見觸發器管理

步驟二:配置函數入口參數

DTS事件來源會以event的形式作為輸入參數傳遞給函數,您可以手動將event傳給函數類比觸發事件。

  1. 在函數詳情頁面的代碼頁簽,單擊測試函數右側的image.png表徵圖,從下拉式清單中,選擇配置測試參數

  2. 配置測試參數面板,選擇建立新測試事件編輯已有測試事件,填寫事件名稱和事件內容,然後單擊確定。

    event格式如下所示:

    [
      {
        "data": {
          "id": 321****,
          "topicPartition": {
            "hash": 0,
            "partition": 0,
            "topic": "cn_hangzhou_rm_1234****_test_version2"
          },
          "offset": 3218099,
          "sourceTimestamp": 1654847757,
          "operationType": "UPDATE",
          "schema": {
            "recordFields": [
              {
                "fieldName": "id",
                "rawDataTypeNum": 8,
                "isPrimaryKey": true,
                "isUniqueKey": false,
                "fieldPosition": 0
              },
              {
                "fieldName": "topic",
                "rawDataTypeNum": 253,
                "isPrimaryKey": false,
                "isUniqueKey": false,
                "fieldPosition": 1
              }
            ],
            "nameIndex": {
              "id": {
                "fieldName": "id",
                "rawDataTypeNum": 8,
                "isPrimaryKey": true,
                "isUniqueKey": false,
                "fieldPosition": 0
              },
              "topic": {
                "fieldName": "topic",
                "rawDataTypeNum": 253,
                "isPrimaryKey": false,
                "isUniqueKey": false,
                "fieldPosition": 1
              }
            },
            "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
            "databaseName": "hangzhou--test-db",
            "tableName": "message_info",
            "primaryIndexInfo": {
              "indexType": "PrimaryKey",
              "indexFields": [
                {
                  "fieldName": "id",
                  "rawDataTypeNum": 8,
                  "isPrimaryKey": true,
                  "isUniqueKey": false,
                  "fieldPosition": 0
                }
              ],
              "cardinality": 0,
              "nullable": true,
              "isFirstUniqueIndex": false
            },
            "uniqueIndexInfo": [],
            "foreignIndexInfo": [],
            "normalIndexInfo": [],
            "databaseInfo": {
              "databaseType": "MySQL",
              "version": "5.7.35-log"
            },
            "totalRows": 0
          },
          "beforeImage": {
            "recordSchema": {
              "recordFields": [
                {
                  "fieldName": "id",
                  "rawDataTypeNum": 8,
                  "isPrimaryKey": true,
                  "isUniqueKey": false,
                  "fieldPosition": 0
                },
                {
                  "fieldName": "topic",
                  "rawDataTypeNum": 253,
                  "isPrimaryKey": false,
                  "isUniqueKey": false,
                  "fieldPosition": 1
                }
              ],
              "nameIndex": {
                "id": {
                  "fieldName": "id",
                  "rawDataTypeNum": 8,
                  "isPrimaryKey": true,
                  "isUniqueKey": false,
                  "fieldPosition": 0
                },
                "topic": {
                  "fieldName": "topic",
                  "rawDataTypeNum": 253,
                  "isPrimaryKey": false,
                  "isUniqueKey": false,
                  "fieldPosition": 1
                }
              },
              "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
              "databaseName": "hangzhou-test-db",
              "tableName": "message_info",
              "primaryIndexInfo": {
                "indexType": "PrimaryKey",
                "indexFields": [
                  {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  }
                    ],
                    "cardinality": 0,
                    "nullable": true,
                    "isFirstUniqueIndex": false
                  },
                    "uniqueIndexInfo": [],
                    "foreignIndexInfo": [],
                    "normalIndexInfo": [],
                    "databaseInfo": {
                    "databaseType": "MySQL",
                    "version": "5.7.35-log"
                  },
                    "totalRows": 0
                  },
                    "values": [
                    {
                    "data": 115
                  },
                    {
                    "data": {
                    "hb": [
                    104,
                    101,
                    108,
                    108,
                    111
                    ],
                    "offset": 0,
                    "isReadOnly": false,
                    "bigEndian": true,
                    "nativeByteOrder": false,
                    "mark": -1,
                    "position": 0,
                    "limit": 9,
                    "capacity": 9,
                    "address": 0
                  },
                    "charset": "utf8mb4"
                  }
                    ],
                    "size": 45
                  },
                    "afterImage": {
                    "recordSchema": {
                    "recordFields": [
                    {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  },
                    {
                    "fieldName": "topic",
                    "rawDataTypeNum": 253,
                    "isPrimaryKey": false,
                    "isUniqueKey": false,
                    "fieldPosition": 1
                  }
                    ],
                    "nameIndex": {
                    "id": {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  },
                    "topic": {
                    "fieldName": "topic",
                    "rawDataTypeNum": 253,
                    "isPrimaryKey": false,
                    "isUniqueKey": false,
                    "fieldPosition": 1
                  }
                  },
                    "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
                    "databaseName": "hangzhou-test-db",
                    "tableName": "message_info",
                    "primaryIndexInfo": {
                    "indexType": "PrimaryKey",
                    "indexFields": [
                    {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  }
                    ],
                    "cardinality": 0,
                    "nullable": true,
                    "isFirstUniqueIndex": false
                  },
                    "uniqueIndexInfo": [],
                    "foreignIndexInfo": [],
                    "normalIndexInfo": [],
                    "databaseInfo": {
                    "databaseType": "MySQL",
                    "version": "5.7.35-log"
                  },
                    "totalRows": 0
                  },
                    "values": [
                    {
                    "data": 115
                  },
                    {
                    "data": {
                    "hb": [
                    98,
                    121,
                    101
                    ],
                    "offset": 0,
                    "isReadOnly": false,
                    "bigEndian": true,
                    "nativeByteOrder": false,
                    "mark": -1,
                    "position": 0,
                    "limit": 11,
                    "capacity": 11,
                    "address": 0
                  },
                    "charset": "utf8mb4"
                  }
                    ],
                    "size": 47
                  }
                  },
        "id": "12f701a43741d404fa9a7be89d9acae0-321****",
        "source": "DTSstreamDemo",
        "specversion": "1.0",
        "type": "dts:ConsumeMessage",
        "datacontenttype": "application/json; charset=utf-8",
        "time": "2022-06-10T07:55:57Z",
        "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro"
      }
    ]

    CloudEvents規範中定義的參數解釋,請參見事件概述

    data欄位包含的參數解釋如下表所示。

    參數

    類型

    說明

    id

    String

    DTS資料ID。

    topicPartition

    Array

    Topic的分區資訊。

    hash

    String

    DTS底層儲存參數。

    partition

    String

    Topic的分區。

    topic

    String

    Topic的名稱。

    offset

    Int

    DTS資料對應的訊息儲存位點。

    sourceTimestamp

    Int

    DTS資料產生時間戳記。

    operationType

    String

    DTS資料的操作類型。

    schema

    Array

    資料庫表結構資訊。

    recordFields

    Array

    欄位詳情記錄。

    fieldName

    String

    欄位名稱。

    rawDataTypeNum

    Int

    欄位類型映射值。

    該值對應從資料訂閱通道中擷取的增量資料還原序列化後的dataTypeNumber欄位值,詳情請參見使用Kafka用戶端消費訂閱資料

    isPrimaryKey

    Boolean

    欄位是否是主鍵。

    isUniqueKey

    Boolean

    欄位是否是唯一值。

    fieldPosition

    String

    欄位位置。

    nameIndex

    Array

    命名索引。

    schemaId

    String

    資料庫表結構資訊的ID。

    databaseName

    String

    資料庫名稱。

    tableName

    String

    資料表名稱。

    primaryIndexInfo

    String

    主鍵索引。

    indexType

    String

    主鍵索引類型。

    indexFields

    Array

    主鍵索引欄位內容。

    cardinality

    String

    主鍵基數。

    nullable

    Boolean

    主鍵是否可為null。

    isFirstUniqueIndex

    Boolean

    是否是第一個唯一索引。

    uniqueIndexInfo

    String

    唯一索引。

    foreignIndexInfo

    String

    外鍵索引。

    normalIndexInfo

    String

    普通索引。

    databaseInfo

    Array

    資料庫資訊。

    databaseType

    String

    資料庫類型。

    version

    String

    資料庫版本。

    totalRows

    Int

    資料表的總行數。

    beforeImage

    String

    操作前記錄欄位內容鏡像。

    values

    String

    記錄欄位的值。

    size

    Int

    記錄欄位大小。

    afterImage

    String

    操作後記錄欄位內容鏡像。

步驟三:編寫函數代碼並測試

觸發器建立完成後,您可以開始編寫並測試函數代碼,以驗證代碼的正確性。在實際操作過程中,當DTS資料訂閱捕捉到資料庫的增量資料後,觸發器會自動觸發函數的執行。

  1. 在函數詳情頁面的代碼頁簽,在代碼編輯器中編寫代碼,然後單擊部署代碼

    本文以Node.js函數代碼為例。

    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      // 解析event參數,對event進行處理。
      callback(null, 'return result');
    }
  2. 單擊測試函數

更多資訊

除了Function Compute控制台,您還可以通過以下方式配置觸發器:

如需對建立的觸發器進行修改或刪除,具體操作,請參見觸發器管理