All Products
Search
Document Center

Function Compute:DTS triggers

Last Updated:Apr 30, 2024

Data Transmission Service (DTS) can be integrated with EventBridge as an event source. After DTS is integrated with Function Compute, DTS triggers can be used to trigger the execution of functions in Function Compute. You can use a function to process the real-time incremental database data obtained from DTS change tracking tasks. This topic describes how to create a DTS trigger in the Function Compute console, configure the input parameters, and write and test code.

Overview

After you submit a request to create a trigger in the Function Compute console, Function Compute creates event stream resources on the EventBridge side based on the trigger configurations.

After the trigger is created, you can view information about the trigger in the Function Compute console. You can also view information about the automatically created resources in the EventBridge console. After the DTS change tracking task captures incremental data of the database, function execution is triggered and one or more message events are pushed to the function in batches for processing based on batch configurations.

Usage notes

  • The DTS change tracking task that is used as the trigger source must be in the same region as the function in Function Compute.
  • When the number of created event streams reaches the upper limit, the DTS trigger cannot be created. For more information about the limit of event streams, see Limits.

Before you begin

Step 1: Create a DTS trigger

  1. Log on to the Function Compute console. In the left-side navigation pane, click Services & Functions.

  2. In the top navigation bar, select a region. On the Services page, click the desired service.

  3. On the Functions page, click the function that you want to manage.
  4. On the function details page, click the Triggers tab, select the version or alias from the Version or Alias drop-down list, and then click Create Trigger.
  5. In the Create Trigger panel, specify related parameters. After you specify the parameters, click OK.
    The following table describes the basic parameters.
    ParameterDescriptionExample
    Trigger TypeThe type of the trigger. For more information about the supported trigger types, see Trigger overview. DTS
    NameThe name of the trigger. dts-trigger
    Version or AliasThe default value is LATEST. If you want to create a trigger for another version or alias, select a version or alias in the upper-right corner of the function details page. For more information about versions and aliases of a service, see Manage versions and Manage aliases. LATEST
    Change Tracking TaskThe name of the change tracking task. dtsqntc2***
    Consumer GroupThe name of the consumer group that you created to consume the data of the change tracking task.
    Important Make sure that the consumer group runs on only one client. Otherwise, the specified consumer offset may become invalid.
    test
    AccountThe account name specified when the consumer group was created. test
    PasswordThe account password specified when the consumer group was created. ******
    Consumer OffsetThe timestamp of the first data entry to be consumed. The data entry specified by the consumer offset must be within the data range of the change tracking task.
    Note The consumer offset takes effect only the first time a new consumer group runs. If a restart is performed in subsequent tasks, consumption continues based on the last consumer offset.
    2022-06-21 00:00:00
    Invocation MethodSelect a method to invoke the function.
    Valid values:
    • Sync Invocation: This mode is suitable for sequential invocations. When an event or a batch of events trigger the function, Function Compute runs the function and waits for a response before it processes the next event or batch of events. The upper limit of the payload for a synchronous invocation request is 32 MB. For more information, see Synchronous invocations.
    • Async Invocation: This mode allows you to quickly consume events. When a single event or a batch of events trigger the function, Function Compute immediately returns a response and continue to process the next event or batch of events During this process, the function is run in asynchronous mode. The upper limit of the payload for an asynchronous invocation request is 128 KB. For more information, see Overview.
    Sync Invocation
    Trigger StateSpecify whether to enable the trigger after it is created. By default, Enable Trigger is selected and the trigger is enabled after it is created. N/A

    For more information about advanced configurations such as message pushing, retry, and dead letter configurations, see Advanced features of triggers.

    After the trigger is created, it is displayed on the Triggers tab. To modify or delete an existing trigger, see Manage triggers.

Step 2: Configure the input parameters of the function

The DTS event source is passed to the function in the form of event, which acts as an input parameter. You can manually pass event to the function to trigger the function.

  1. On the function details page, click the Code tab and click the xialatubiao icon. From the drop-down list that appears, select Configure Test Parameters.
  2. In the Configure Test Parameters panel, click the Create New Test Event or Modify Existing Test Event tab, and specify Event Name and the event content. After you specify the parameters, click OK.
    Sample code of event:
    [
      {
        "data": {
          "id": 321****,
          "topicPartition": {
            "hash": 0,
            "partition": 0,
            "topic": "cn_hangzhou_rm_1234****_test_version2"
          },
          "offset": 3218099,
          "sourceTimestamp": 1654847757,
          "operationType": "UPDATE",
          "schema": {
            "recordFields": [
              {
                "fieldName": "id",
                "rawDataTypeNum": 8,
                "isPrimaryKey": true,
                "isUniqueKey": false,
                "fieldPosition": 0
              },
              {
                "fieldName": "topic",
                "rawDataTypeNum": 253,
                "isPrimaryKey": false,
                "isUniqueKey": false,
                "fieldPosition": 1
              }
            ],
            "nameIndex": {
              "id": {
                "fieldName": "id",
                "rawDataTypeNum": 8,
                "isPrimaryKey": true,
                "isUniqueKey": false,
                "fieldPosition": 0
              },
              "topic": {
                "fieldName": "topic",
                "rawDataTypeNum": 253,
                "isPrimaryKey": false,
                "isUniqueKey": false,
                "fieldPosition": 1
              }
            },
            "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
            "databaseName": "hangzhou--test-db",
            "tableName": "message_info",
            "primaryIndexInfo": {
              "indexType": "PrimaryKey",
              "indexFields": [
                {
                  "fieldName": "id",
                  "rawDataTypeNum": 8,
                  "isPrimaryKey": true,
                  "isUniqueKey": false,
                  "fieldPosition": 0
                }
              ],
              "cardinality": 0,
              "nullable": true,
              "isFirstUniqueIndex": false
            },
            "uniqueIndexInfo": [],
            "foreignIndexInfo": [],
            "normalIndexInfo": [],
            "databaseInfo": {
              "databaseType": "MySQL",
              "version": "5.7.35-log"
            },
            "totalRows": 0
          },
          "beforeImage": {
            "recordSchema": {
              "recordFields": [
                {
                  "fieldName": "id",
                  "rawDataTypeNum": 8,
                  "isPrimaryKey": true,
                  "isUniqueKey": false,
                  "fieldPosition": 0
                },
                {
                  "fieldName": "topic",
                  "rawDataTypeNum": 253,
                  "isPrimaryKey": false,
                  "isUniqueKey": false,
                  "fieldPosition": 1
                }
              ],
              "nameIndex": {
                "id": {
                  "fieldName": "id",
                  "rawDataTypeNum": 8,
                  "isPrimaryKey": true,
                  "isUniqueKey": false,
                  "fieldPosition": 0
                },
                "topic": {
                  "fieldName": "topic",
                  "rawDataTypeNum": 253,
                  "isPrimaryKey": false,
                  "isUniqueKey": false,
                  "fieldPosition": 1
                }
              },
              "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
              "databaseName": "hangzhou-test-db",
              "tableName": "message_info",
              "primaryIndexInfo": {
                "indexType": "PrimaryKey",
                "indexFields": [
                  {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  }
                    ],
                    "cardinality": 0,
                    "nullable": true,
                    "isFirstUniqueIndex": false
                  },
                    "uniqueIndexInfo": [],
                    "foreignIndexInfo": [],
                    "normalIndexInfo": [],
                    "databaseInfo": {
                    "databaseType": "MySQL",
                    "version": "5.7.35-log"
                  },
                    "totalRows": 0
                  },
                    "values": [
                    {
                    "data": 115
                  },
                    {
                    "data": {
                    "hb": [
                    104,
                    101,
                    108,
                    108,
                    111
                    ],
                    "offset": 0,
                    "isReadOnly": false,
                    "bigEndian": true,
                    "nativeByteOrder": false,
                    "mark": -1,
                    "position": 0,
                    "limit": 9,
                    "capacity": 9,
                    "address": 0
                  },
                    "charset": "utf8mb4"
                  }
                    ],
                    "size": 45
                  },
                    "afterImage": {
                    "recordSchema": {
                    "recordFields": [
                    {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  },
                    {
                    "fieldName": "topic",
                    "rawDataTypeNum": 253,
                    "isPrimaryKey": false,
                    "isUniqueKey": false,
                    "fieldPosition": 1
                  }
                    ],
                    "nameIndex": {
                    "id": {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  },
                    "topic": {
                    "fieldName": "topic",
                    "rawDataTypeNum": 253,
                    "isPrimaryKey": false,
                    "isUniqueKey": false,
                    "fieldPosition": 1
                  }
                  },
                    "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
                    "databaseName": "hangzhou-test-db",
                    "tableName": "message_info",
                    "primaryIndexInfo": {
                    "indexType": "PrimaryKey",
                    "indexFields": [
                    {
                    "fieldName": "id",
                    "rawDataTypeNum": 8,
                    "isPrimaryKey": true,
                    "isUniqueKey": false,
                    "fieldPosition": 0
                  }
                    ],
                    "cardinality": 0,
                    "nullable": true,
                    "isFirstUniqueIndex": false
                  },
                    "uniqueIndexInfo": [],
                    "foreignIndexInfo": [],
                    "normalIndexInfo": [],
                    "databaseInfo": {
                    "databaseType": "MySQL",
                    "version": "5.7.35-log"
                  },
                    "totalRows": 0
                  },
                    "values": [
                    {
                    "data": 115
                  },
                    {
                    "data": {
                    "hb": [
                    98,
                    121,
                    101
                    ],
                    "offset": 0,
                    "isReadOnly": false,
                    "bigEndian": true,
                    "nativeByteOrder": false,
                    "mark": -1,
                    "position": 0,
                    "limit": 11,
                    "capacity": 11,
                    "address": 0
                  },
                    "charset": "utf8mb4"
                  }
                    ],
                    "size": 47
                  }
                  },
        "id": "12f701a43741d404fa9a7be89d9acae0-321****",
        "source": "DTSstreamDemo",
        "specversion": "1.0",
        "type": "dts:ConsumeMessage",
        "datacontenttype": "application/json; charset=utf-8",
        "time": "2022-06-10T07:55:57Z",
        "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro"
      }
    ]

    For information about the parameters defined in the CloudEvents specification, see Overview.

    The following table describes the parameters contained in the data field.

    Parameter

    Type

    Description

    id

    String

    The ID of the DTS data entry.

    topicPartition

    Array

    The partition information about the topic to which the event is pushed.

    hash

    String

    The underlying storage parameter of DTS.

    partition

    String

    The partition.

    topic

    String

    The topic name.

    offset

    Int

    The offset of the DTS data entry.

    sourceTimestamp

    Int

    The timestamp that indicates when the DTS data entry was generated.

    operationType

    String

    The type of operation involved in the DTS data entry.

    schema

    Array

    The schema information about the database.

    recordFields

    Array

    The details of fields.

    fieldName

    String

    The field name.

    rawDataTypeNum

    Int

    The mapped value of the field type.

    The value of this parameter corresponds to the value of the dataTypeNumber field in the deserialized incremental data from the change tracking instance. For more information, see Use a Kafka client to consume tracked data.

    isPrimaryKey

    Boolean

    Indicates whether the field is a primary key field.

    isUniqueKey

    Boolean

    Indicates whether the field has a unique key.

    fieldPosition

    String

    The field position.

    nameIndex

    Array

    The indexing information about the fields based on field names.

    schemaId

    String

    The ID of the database schema.

    databaseName

    String

    The database name.

    tableName

    String

    The table name.

    primaryIndexInfo

    String

    The primary key indexes.

    indexType

    String

    The index type.

    indexFields

    Array

    The fields on which the indexes are created.

    cardinality

    String

    The cardinality of the primary keys.

    nullable

    Boolean

    Indicates whether the primary keys can be null.

    isFirstUniqueIndex

    Boolean

    Indicates whether the index is the first unique index.

    uniqueIndexInfo

    String

    The unique indexes.

    foreignIndexInfo

    String

    The indexes for foreign keys.

    normalIndexInfo

    String

    The regular indexes.

    databaseInfo

    Array

    The information about the database.

    databaseType

    String

    The database engine.

    version

    String

    The database engine version.

    totalRows

    Int

    The total number of rows in the table.

    beforeImage

    String

    The image that records field values before the operation is performed.

    values

    String

    The field values recorded.

    size

    Int

    The size of the fields recorded.

    afterImage

    String

    The image that records field values after the operation is performed.

Step 3: Write and test the function

After you create the trigger, you can write function code and test the function to verify whether the code is correct. In actual scenarios, when the DTS change tracking task captures incremental data of the database, the trigger automatically triggers the execution of the function.

  1. On the function details page, click the Code tab, edit the function code in the code editor, and then click Deploy.
    The following uses the Node.js function code as an example.
    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      // Parse the event parameters and process the event. 
      callback(null, 'return result');
    }
  2. Click the Code tab and click Test Function.
    After the function is executed, you can view the result on the Code tab.

References

In addition to the Function Compute console, you can configure triggers by using the following methods:
  • Use Serverless Devs to configure triggers. For more information, see Serverless Devs.
  • Use SDKs to configure triggers. For more information, see SDKs.

To modify or delete an existing trigger, see Manage triggers.