This topic describes how to create an event stream whose event provider is Data Transmission Service (DTS) in the EventBridge console.
Prerequisites
A change tracking task is created in the DTS console and is in the Normal state. For more information, see Manage a change tracking task.
A consumer group is created in the change tracking task. For more information, see Create consumer groups.
EventBridge is activated and the required permissions are granted to a Resource Access Management (RAM) user. For more information, see Activate EventBridge and grant permissions to a RAM user.
Supported regions
DTS can be used as the event provider of an event stream in the following regions: China (Hangzhou), China (Shanghai), China (Qingdao), China (Beijing), China (Shenzhen), China (Guangzhou), China (Chengdu), and China (Hong Kong).
Procedure
Event streams in EventBridge can transfer only data that is managed by executing INSERT, DELETE, UPDATE, and DDL statements in DTS.
Log on to the EventBridge console. In the left-side navigation pane, click Event Streams.
In the top navigation bar, select a region and click Create Event Stream.
On the Create Event Stream page, configure the Task Name and Description parameters and follow the on-screen instructions to configure other parameters. Then, click Save. The following section describes the parameters:
Task Creation
In the Source step, set the Data Provider parameter to Data Transmission Service (DTS) and configure other parameters. Then, click Next Step. The following table describes the parameters.
Parameter
Description
Example
Data Subscription Task
The ID of the change tracking task that you created in the DTS console.
dts8jqe****
Access Method
The access method of the database instance that serves as the source of the change tracking task. You cannot change the value of this parameter.
RDS
Instance ID
The ID of the database instance that serves as the source of the change tracking task. You cannot change the value of this parameter.
rm-bp18mj3q2dzyb****
Consumer Group
The name of the consumer group that you created to consume the data of the change tracking task.
NoteMake sure that the consumer group runs on only one client. Otherwise, the specified consumption checkpoint may become invalid.
test
Account
The account name that you specified when you created the consumer group.
test
Password
The account password that you specified when you created the consumer group.
******
Consumer Offset
The time when the first data entry is to be consumed. The data entry that is specified by the consumer offset must be within the data range of the change tracking task.
NoteThe consumer offset that you specify takes effect only when the consumer group consumes data for the first time. If the change tracking task is restarted, the consumer group consumes data from the last recorded consumer offset.
2022-06-21 00:00:00
Batch Push
The batch push feature helps you aggregate multiple events at a time. This feature is triggered if the condition that is specified by the Messages parameter or the Interval (Unit: Seconds) parameter is met.
For example, if you set the Messages parameter to 100 and the Interval (Unit: Seconds) parameter to 15, the push is executed when the number of messages reaches 100 even if only 10 seconds are elapsed.
Enable
Messages
The maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10000.
100
Interval (Unit: Seconds)
The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. Unit: seconds. The value 0 indicates that messages are immediately sent after aggregation.
3
In the Filtering, Transformation, and Sink steps, configure the event filtering method, event transformation rule, and event target. For information about event transformation configurations, see Use Function Compute to perform message cleansing.
Task Property
Configure the retry policy and dead-letter queue for the event stream. For more information, see Retry policies and dead-letter queues.
Go back to the Event Streams page and find the event stream that you created. Then, click Enable in the Actions column.
Enabling an event stream requires 30 to 60 seconds to complete. You can view the progress in the Status column of the event stream on the Event Streams page.
Sample event
The following code provides a sample event generated after a change tracking task is created for a MySQL database in DTS:
{
"data": {
"id": 321****,
"topicPartition": {
"hash": 0,
"partition": 0,
"topic": "cn_hangzhou_rm_1234****_test_version2"
},
"offset": 3218099,
"sourceTimestamp": 1654847757,
"operationType": "UPDATE",
"schema": {
"recordFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
{
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
],
"nameIndex": {
"id": {
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
"topic": {
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
},
"schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
"databaseName": "hangzhou--test-db",
"tableName": "message_info",
"primaryIndexInfo": {
"indexType": "PrimaryKey",
"indexFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
}
],
"cardinality": 0,
"nullable": true,
"isFirstUniqueIndex": false
},
"uniqueIndexInfo": [],
"foreignIndexInfo": [],
"normalIndexInfo": [],
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
},
"totalRows": 0
},
"beforeImage": {
"recordSchema": {
"recordFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
{
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
],
"nameIndex": {
"id": {
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
"topic": {
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
},
"schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
"databaseName": "hangzhou-test-db",
"tableName": "message_info",
"primaryIndexInfo": {
"indexType": "PrimaryKey",
"indexFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
}
],
"cardinality": 0,
"nullable": true,
"isFirstUniqueIndex": false
},
"uniqueIndexInfo": [],
"foreignIndexInfo": [],
"normalIndexInfo": [],
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
},
"totalRows": 0
},
"values": [
{
"data": 115
},
{
"data": {
"hb": [
104,
101,
108,
108,
111
],
"offset": 0,
"isReadOnly": false,
"bigEndian": true,
"nativeByteOrder": false,
"mark": -1,
"position": 0,
"limit": 9,
"capacity": 9,
"address": 0
},
"charset": "utf8mb4"
}
],
"size": 45
},
"afterImage": {
"recordSchema": {
"recordFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
{
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
],
"nameIndex": {
"id": {
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
},
"topic": {
"fieldName": "topic",
"rawDataTypeNum": 253,
"isPrimaryKey": false,
"isUniqueKey": false,
"fieldPosition": 1
}
},
"schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
"databaseName": "hangzhou-test-db",
"tableName": "message_info",
"primaryIndexInfo": {
"indexType": "PrimaryKey",
"indexFields": [
{
"fieldName": "id",
"rawDataTypeNum": 8,
"isPrimaryKey": true,
"isUniqueKey": false,
"fieldPosition": 0
}
],
"cardinality": 0,
"nullable": true,
"isFirstUniqueIndex": false
},
"uniqueIndexInfo": [],
"foreignIndexInfo": [],
"normalIndexInfo": [],
"databaseInfo": {
"databaseType": "MySQL",
"version": "5.7.35-log"
},
"totalRows": 0
},
"values": [
{
"data": 115
},
{
"data": {
"hb": [
98,
121,
101
],
"offset": 0,
"isReadOnly": false,
"bigEndian": true,
"nativeByteOrder": false,
"mark": -1,
"position": 0,
"limit": 11,
"capacity": 11,
"address": 0
},
"charset": "utf8mb4"
}
],
"size": 47
}
},
"id": "12f701a43741d404fa9a7be89d9acae0-321****",
"source": "DTSstreamDemo",
"specversion": "1.0",
"type": "dts:ConsumeMessage",
"datacontenttype": "application/json; charset=utf-8",
"time": "2022-06-10T07:55:57Z",
"subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro"
}
For information about the parameters defined in the CloudEvents specification, see Overview.
The following table describes the parameters contained in the data field.
Parameter | Type | Description |
id | String | The ID of the DTS data entry. |
topicPartition | Array | The partition information about the topic to which the event is pushed. |
hash | String | The underlying storage parameter of DTS. |
partition | String | The partition. |
topic | String | The topic name. |
offset | Int | The offset of the DTS data entry. |
sourceTimestamp | Int | The timestamp that indicates when the DTS data entry was generated. |
operationType | String | The type of operation involved in the DTS data entry. |
schema | Array | The schema information about the database. |
recordFields | Array | The details of fields. |
fieldName | String | The field name. |
rawDataTypeNum | Int | The mapped value of the field type. The value of this parameter corresponds to the value of the dataTypeNumber field in the deserialized incremental data from the change tracking instance. For more information, see Use a Kafka client to consume tracked data. |
isPrimaryKey | Boolean | Indicates whether the field is a primary key field. |
isUniqueKey | Boolean | Indicates whether the field has a unique key. |
fieldPosition | String | The field position. |
nameIndex | Array | The indexing information about the fields based on field names. |
schemaId | String | The ID of the database schema. |
databaseName | String | The database name. |
tableName | String | The table name. |
primaryIndexInfo | String | The primary key indexes. |
indexType | String | The index type. |
indexFields | Array | The fields on which the indexes are created. |
cardinality | String | The cardinality of the primary keys. |
nullable | Boolean | Indicates whether the primary keys can be null. |
isFirstUniqueIndex | Boolean | Indicates whether the index is the first unique index. |
uniqueIndexInfo | String | The unique indexes. |
foreignIndexInfo | String | The indexes for foreign keys. |
normalIndexInfo | String | The regular indexes. |
databaseInfo | Array | The information about the database. |
databaseType | String | The database engine. |
version | String | The database engine version. |
totalRows | Int | The total number of rows in the table. |
beforeImage | String | The image that records field values before the operation is performed. |
values | String | The field values recorded. |
size | Int | The size of the fields recorded. |
afterImage | String | The image that records field values after the operation is performed. |