After Data Transmission Service (DTS) is integrated with Function Compute by using EventBridge, DTS triggers can be used to invoke functions in Function Compute. You can use a function to process the real-time incremental data obtained from DTS change tracking tasks. This topic describes how to create a DTS trigger, configure the input parameters of a function, and write and test function code in the Function Compute console.
Overview
After you submit a request to create a trigger in the Function Compute console, Function Compute automatically creates event streams in EventBridge based on the configurations of the trigger.
After the trigger is created, you can view the information about the trigger in the Function Compute console. You can also view the information about the created resources in the EventBridge console. After your DTS change tracking task captures the incremental data of a database, the associated function is invoked. One or more message events are pushed to the function in batches for processing based on your batch configurations.
Usage notes
The DTS change tracking task that serves as the trigger source must reside in the same region as the function to be invoked in Function Compute.
If the number of created event streams reaches the upper limit, no more DTS triggers can be created. For more information about the limits on the number of event streams, see Limits.
Prerequisites
EventBridge
EventBridge is activated, and the required permissions are granted to a Resource Access Management (RAM) user. For more information, see Activate EventBridge and grant permissions to a RAM user.
Function Compute
A function is created. For more information, see the Create a function section of the "Manage functions" topic.
DTS
A change tracking task is created. For more information, see Manage a change tracking task.
A consumer group is created. For more information, see Create consumer groups.
Step 1: Create a DTS trigger
Log on to the Function Compute console. In the left-side navigation pane, click Functions.
In the top navigation bar, select a region. On the Functions page, click the function that you want to manage.
On the function details page, click the Configurations tab. In the left-side navigation pane, click Triggers. Then, click Create Trigger.
In the Create Trigger panel, configure the parameters and click OK.
The following table describes the basic parameters.
Parameter
Description
Example
Trigger Type
The type of the trigger. For more information about the supported trigger types, see Trigger overview.
DTS
Name
The name of the trigger.
dts-trigger
Version or Alias
The version or alias of the trigger. Default value: LATEST. If you want to create a trigger for another version or alias, select a version or alias in the upper-right corner of the function details page. For more information about versions and aliases, see Manage versions and Manage aliases.
LATEST
Change Tracking Task
The name of the existing change tracking task.
dtsqntc2***
Consumer Group
The name of the consumer group that you created to consume the data of the change tracking task.
ImportantMake sure that the consumer group runs only on one client. Otherwise, the specified consumption checkpoint may become invalid.
test
Account
The account specified when you created the consumer group.
test
Password
The account password that you specified when you created the consumer group.
******
Consumer Offset
The time when the first data entry is to be consumed. The specified consumption checkpoint must be within the data range of the change tracking task.
NoteThe consumption checkpoint takes effect only when a new consumer group runs for the first time. If subsequent tasks are restarted, consumption continues based on the previous consumption checkpoint.
2022-06-21 00:00:00
Invocation Method
The mode in which the function is invoked.
Valid values:
Sync Invocation: This mode is suitable for sequential invocations. When an event or a batch of events invoke the function, Function Compute executes the function and waits for a response before it processes the next event or batch of events. The upper limit of the payload for a synchronous invocation request is 32 MB. For more information, see Synchronous invocations.
Async Invocation: This mode allows you to quickly consume events. When an event or a batch of events invoke the function, Function Compute immediately returns a response and continues to process the next event or batch of events. During this process, the function is executed in asynchronous mode. The upper limit of the payload for an asynchronous invocation request is 128 KB. For more information, see Overview.
Sync Invocation
Trigger State
Specifies whether to enable the trigger immediately after it is created. By default, Enable Trigger is selected. The trigger is immediately enabled after it is created.
N/A
For more information about advanced configurations such as push settings, retry policies, and dead-letter queues, see Advanced features of triggers.
After the trigger is created, it is displayed on the Triggers tab. To modify or delete a trigger, see Manage triggers.
Step 2: Configure the input parameters of the function
A DTS event is used to invoke a function in Function Compute. The parameters of the event
are used as the input parameters of the function. You can manually pass the parameters of the event
to invoke the function as a test.
On the Code tab of the function details page, click the icon next Test Function and select Configure Test Parameters from the drop-down list.
In the Configure Test Parameters panel, click the Create New Test Event or Modify Existing Test Event tab, enter the event name and event content, and then click OK.
The following sample code provides an example on the format of the
event
content:[ { "data": { "id": 321****, "topicPartition": { "hash": 0, "partition": 0, "topic": "cn_hangzhou_rm_1234****_test_version2" }, "offset": 3218099, "sourceTimestamp": 1654847757, "operationType": "UPDATE", "schema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou--test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "beforeImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 104, 101, 108, 108, 111 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 9, "capacity": 9, "address": 0 }, "charset": "utf8mb4" } ], "size": 45 }, "afterImage": { "recordSchema": { "recordFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } ], "nameIndex": { "id": { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 }, "topic": { "fieldName": "topic", "rawDataTypeNum": 253, "isPrimaryKey": false, "isUniqueKey": false, "fieldPosition": 1 } }, "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)", "databaseName": "hangzhou-test-db", "tableName": "message_info", "primaryIndexInfo": { "indexType": "PrimaryKey", "indexFields": [ { "fieldName": "id", "rawDataTypeNum": 8, "isPrimaryKey": true, "isUniqueKey": false, "fieldPosition": 0 } ], "cardinality": 0, "nullable": true, "isFirstUniqueIndex": false }, "uniqueIndexInfo": [], "foreignIndexInfo": [], "normalIndexInfo": [], "databaseInfo": { "databaseType": "MySQL", "version": "5.7.35-log" }, "totalRows": 0 }, "values": [ { "data": 115 }, { "data": { "hb": [ 98, 121, 101 ], "offset": 0, "isReadOnly": false, "bigEndian": true, "nativeByteOrder": false, "mark": -1, "position": 0, "limit": 11, "capacity": 11, "address": 0 }, "charset": "utf8mb4" } ], "size": 47 } }, "id": "12f701a43741d404fa9a7be89d9acae0-321****", "source": "DTSstreamDemo", "specversion": "1.0", "type": "dts:ConsumeMessage", "datacontenttype": "application/json; charset=utf-8", "time": "2022-06-10T07:55:57Z", "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro" } ]
For information about the parameters defined in the CloudEvents specification, see Overview.
The following table describes the parameters contained in data.
Parameter
Type
Description
id
String
The ID of the DTS data entry.
topicPartition
Array
The partition information about the topic to which the event is pushed.
hash
String
The underlying storage parameter of DTS.
partition
String
The partition.
topic
String
The topic name.
offset
Int
The offset of the DTS data entry.
sourceTimestamp
Int
The timestamp that indicates when the DTS data entry was generated.
operationType
String
The type of operation involved in the DTS data entry.
schema
Array
The schema information about the database.
recordFields
Array
The details of fields.
fieldName
String
The field name.
rawDataTypeNum
Int
The mapped value of the field type.
isPrimaryKey
Boolean
Indicates whether the field is a primary key field.
isUniqueKey
Boolean
Indicates whether the field has a unique key.
fieldPosition
String
The field position.
nameIndex
Array
The indexing information about the fields based on field names.
schemaId
String
The ID of the database schema.
databaseName
String
The database name.
tableName
String
The table name.
primaryIndexInfo
String
The primary key indexes.
indexType
String
The index type.
indexFields
Array
The fields on which the indexes are created.
cardinality
String
The cardinality of the primary keys.
nullable
Boolean
Indicates whether the primary keys can be null.
isFirstUniqueIndex
Boolean
Indicates whether the index is the first unique index.
uniqueIndexInfo
String
The unique indexes.
foreignIndexInfo
String
The indexes of foreign keys.
normalIndexInfo
String
The regular indexes.
databaseInfo
Array
The information about the database.
databaseType
String
The database engine.
version
String
The database engine version.
totalRows
Int
The total number of rows in the table.
beforeImage
String
The image that records field values before the operation is performed.
values
String
The field values recorded.
size
Int
The size of the fields recorded.
afterImage
String
The image that records field values after the operation is performed.
Step 3: Write and test function code
After you create the trigger, you can write function code and test the function code to verify whether the code is valid. When the DTS change tracking task captures the incremental data of the database, the trigger automatically invokes the function.
On the function details page, click the Code tab, enter function code in the code editor, and then click Deploy.
In this example, the function code is written in Node.js.
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); // Parse the event parameters and process the event. callback(null, 'return result'); }
Click Test Function.
More information
In addition to the Function Compute console, you can configure triggers by using one of the following methods:
Use SDKs to configure triggers. For more information, see SDKs.
To modify or delete an existing trigger, see Manage triggers.