All Products
Search
Document Center

EventBridge:DTS

Last Updated:Apr 30, 2024

This topic describes how to create an event stream whose event provider is Data Transmission Service (DTS) in the EventBridge console.

Prerequisites

Supported regions

DTS can be used as the event provider of an event stream in the following regions: China (Hangzhou), China (Shanghai), China (Qingdao), China (Beijing), China (Shenzhen), China (Guangzhou), China (Chengdu), and China (Hong Kong).

Procedure

Important

Event streams in EventBridge can transfer only data that is managed by executing INSERT, DELETE, UPDATE, and DDL statements in DTS.

  1. Log on to the EventBridge console. In the left-side navigation pane, click Event Streams.

  2. In the top navigation bar, select a region and click Create Event Stream.

  3. On the Create Event Stream page, configure the Task Name and Description parameters and follow the on-screen instructions to configure other parameters. Then, click Save. The following section describes the parameters:

    • Task Creation

      1. In the Source step, set the Data Provider parameter to Data Transmission Service (DTS) and configure other parameters. Then, click Next Step. The following table describes the parameters.

        Parameter

        Description

        Example

        Data Subscription Task

        The ID of the change tracking task that you created in the DTS console.

        dts8jqe****

        Access Method

        The access method of the database instance that serves as the source of the change tracking task. You cannot change the value of this parameter.

        RDS

        Instance ID

        The ID of the database instance that serves as the source of the change tracking task. You cannot change the value of this parameter.

        rm-bp18mj3q2dzyb****

        Consumer Group

        The name of the consumer group that you created to consume the data of the change tracking task.

        Note

        Make sure that the consumer group runs on only one client. Otherwise, the specified consumption checkpoint may become invalid.

        test

        Account

        The account name that you specified when you created the consumer group.

        test

        Password

        The account password that you specified when you created the consumer group.

        ******

        Consumer Offset

        The time when the first data entry is to be consumed. The data entry that is specified by the consumer offset must be within the data range of the change tracking task.

        Note

        The consumer offset that you specify takes effect only when the consumer group consumes data for the first time. If the change tracking task is restarted, the consumer group consumes data from the last recorded consumer offset.

        2022-06-21 00:00:00

        Batch Push

        The batch push feature helps you aggregate multiple events at a time. This feature is triggered if the condition that is specified by the Messages parameter or the Interval (Unit: Seconds) parameter is met.

        For example, if you set the Messages parameter to 100 and the Interval (Unit: Seconds) parameter to 15, the push is executed when the number of messages reaches 100 even if only 10 seconds are elapsed.

        Enable

        Messages

        The maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10000.

        100

        Interval (Unit: Seconds)

        The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. Unit: seconds. The value 0 indicates that messages are immediately sent after aggregation.

        3

      2. In the Filtering, Transformation, and Sink steps, configure the event filtering method, event transformation rule, and event target. For information about event transformation configurations, see Use Function Compute to perform message cleansing.

    • Task Property

      Configure the retry policy and dead-letter queue for the event stream. For more information, see Retry policies and dead-letter queues.

  4. Go back to the Event Streams page and find the event stream that you created. Then, click Enable in the Actions column.

    Enabling an event stream requires 30 to 60 seconds to complete. You can view the progress in the Status column of the event stream on the Event Streams page.

Sample event

The following code provides a sample event generated after a change tracking task is created for a MySQL database in DTS:

{
  "data": {
    "id": 321****,
    "topicPartition": {
      "hash": 0,
      "partition": 0,
      "topic": "cn_hangzhou_rm_1234****_test_version2"
    },
    "offset": 3218099,
    "sourceTimestamp": 1654847757,
    "operationType": "UPDATE",
    "schema": {
      "recordFields": [
        {
          "fieldName": "id",
          "rawDataTypeNum": 8,
          "isPrimaryKey": true,
          "isUniqueKey": false,
          "fieldPosition": 0
        },
        {
          "fieldName": "topic",
          "rawDataTypeNum": 253,
          "isPrimaryKey": false,
          "isUniqueKey": false,
          "fieldPosition": 1
        }
      ],
      "nameIndex": {
        "id": {
          "fieldName": "id",
          "rawDataTypeNum": 8,
          "isPrimaryKey": true,
          "isUniqueKey": false,
          "fieldPosition": 0
        },
        "topic": {
          "fieldName": "topic",
          "rawDataTypeNum": 253,
          "isPrimaryKey": false,
          "isUniqueKey": false,
          "fieldPosition": 1
        }
      },
      "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
      "databaseName": "hangzhou--test-db",
      "tableName": "message_info",
      "primaryIndexInfo": {
        "indexType": "PrimaryKey",
        "indexFields": [
          {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          }
        ],
        "cardinality": 0,
        "nullable": true,
        "isFirstUniqueIndex": false
      },
      "uniqueIndexInfo": [],
      "foreignIndexInfo": [],
      "normalIndexInfo": [],
      "databaseInfo": {
        "databaseType": "MySQL",
        "version": "5.7.35-log"
      },
      "totalRows": 0
    },
    "beforeImage": {
      "recordSchema": {
        "recordFields": [
          {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        ],
        "nameIndex": {
          "id": {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          "topic": {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        },
        "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
        "databaseName": "hangzhou-test-db",
        "tableName": "message_info",
        "primaryIndexInfo": {
          "indexType": "PrimaryKey",
          "indexFields": [
            {
              "fieldName": "id",
              "rawDataTypeNum": 8,
              "isPrimaryKey": true,
              "isUniqueKey": false,
              "fieldPosition": 0
            }
          ],
          "cardinality": 0,
          "nullable": true,
          "isFirstUniqueIndex": false
        },
        "uniqueIndexInfo": [],
        "foreignIndexInfo": [],
        "normalIndexInfo": [],
        "databaseInfo": {
          "databaseType": "MySQL",
          "version": "5.7.35-log"
        },
        "totalRows": 0
      },
      "values": [
        {
          "data": 115
        },
        {
          "data": {
            "hb": [
              104,
              101,
              108,
              108,
              111
            ],
            "offset": 0,
            "isReadOnly": false,
            "bigEndian": true,
            "nativeByteOrder": false,
            "mark": -1,
            "position": 0,
            "limit": 9,
            "capacity": 9,
            "address": 0
          },
          "charset": "utf8mb4"
        }
      ],
      "size": 45
    },
    "afterImage": {
      "recordSchema": {
        "recordFields": [
          {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        ],
        "nameIndex": {
          "id": {
            "fieldName": "id",
            "rawDataTypeNum": 8,
            "isPrimaryKey": true,
            "isUniqueKey": false,
            "fieldPosition": 0
          },
          "topic": {
            "fieldName": "topic",
            "rawDataTypeNum": 253,
            "isPrimaryKey": false,
            "isUniqueKey": false,
            "fieldPosition": 1
          }
        },
        "schemaId": "(hangzhou-test-db,hangzhou-test-db,message_info)",
        "databaseName": "hangzhou-test-db",
        "tableName": "message_info",
        "primaryIndexInfo": {
          "indexType": "PrimaryKey",
          "indexFields": [
            {
              "fieldName": "id",
              "rawDataTypeNum": 8,
              "isPrimaryKey": true,
              "isUniqueKey": false,
              "fieldPosition": 0
            }
          ],
          "cardinality": 0,
          "nullable": true,
          "isFirstUniqueIndex": false
        },
        "uniqueIndexInfo": [],
        "foreignIndexInfo": [],
        "normalIndexInfo": [],
        "databaseInfo": {
          "databaseType": "MySQL",
          "version": "5.7.35-log"
        },
        "totalRows": 0
      },
      "values": [
        {
          "data": 115
        },
        {
          "data": {
            "hb": [
              98,
              121,
              101
            ],
            "offset": 0,
            "isReadOnly": false,
            "bigEndian": true,
            "nativeByteOrder": false,
            "mark": -1,
            "position": 0,
            "limit": 11,
            "capacity": 11,
            "address": 0
          },
          "charset": "utf8mb4"
        }
      ],
      "size": 47
    }
  },
  "id": "12f701a43741d404fa9a7be89d9acae0-321****",
  "source": "DTSstreamDemo",
  "specversion": "1.0",
  "type": "dts:ConsumeMessage",
  "datacontenttype": "application/json; charset=utf-8",
  "time": "2022-06-10T07:55:57Z",
  "subject": "acs:dts:cn-hangzhou:12345****:kk123abc60g782/dtsabcdet1ro"
}

For information about the parameters defined in the CloudEvents specification, see Overview.

The following table describes the parameters contained in the data field.

Parameter

Type

Description

id

String

The ID of the DTS data entry.

topicPartition

Array

The partition information about the topic to which the event is pushed.

hash

String

The underlying storage parameter of DTS.

partition

String

The partition.

topic

String

The topic name.

offset

Int

The offset of the DTS data entry.

sourceTimestamp

Int

The timestamp that indicates when the DTS data entry was generated.

operationType

String

The type of operation involved in the DTS data entry.

schema

Array

The schema information about the database.

recordFields

Array

The details of fields.

fieldName

String

The field name.

rawDataTypeNum

Int

The mapped value of the field type.

The value of this parameter corresponds to the value of the dataTypeNumber field in the deserialized incremental data from the change tracking instance. For more information, see Use a Kafka client to consume tracked data.

isPrimaryKey

Boolean

Indicates whether the field is a primary key field.

isUniqueKey

Boolean

Indicates whether the field has a unique key.

fieldPosition

String

The field position.

nameIndex

Array

The indexing information about the fields based on field names.

schemaId

String

The ID of the database schema.

databaseName

String

The database name.

tableName

String

The table name.

primaryIndexInfo

String

The primary key indexes.

indexType

String

The index type.

indexFields

Array

The fields on which the indexes are created.

cardinality

String

The cardinality of the primary keys.

nullable

Boolean

Indicates whether the primary keys can be null.

isFirstUniqueIndex

Boolean

Indicates whether the index is the first unique index.

uniqueIndexInfo

String

The unique indexes.

foreignIndexInfo

String

The indexes for foreign keys.

normalIndexInfo

String

The regular indexes.

databaseInfo

Array

The information about the database.

databaseType

String

The database engine.

version

String

The database engine version.

totalRows

Int

The total number of rows in the table.

beforeImage

String

The image that records field values before the operation is performed.

values

String

The field values recorded.

size

Int

The size of the fields recorded.

afterImage

String

The image that records field values after the operation is performed.