This tutorial shows you how to build a change data capture (CDC) pipeline that streams row-level changes from an SQL Server database to ApsaraMQ for Kafka. The pipeline uses Kafka Connect in distributed mode with the Debezium SQL Server source connector.
How it works
The Debezium SQL Server source connector reads the SQL Server transaction log through CDC and converts each insert, update, or delete into a Kafka message. Kafka Connect runs the connector as a distributed worker process, pushes change events to ApsaraMQ for Kafka topics, and tracks schema history in a dedicated topic.
Data flow:
SQL Server (CDC enabled) → Debezium source connector → Kafka Connect → ApsaraMQ for Kafka topicsEach monitored table maps to a separate topic named <server-name>.<database>.<table> -- for example, server1.testDB.products.
Prerequisites
Before you begin, prepare the following components:
Debezium SQL Server source connector -- Download from the Maven repository. Choose a version compatible with your Kafka Connect version
Kafka Connect 2.1.0 or later -- Download from Apache Kafka downloads
Note The Debezium SQL Server source connector requires Kafka Connect 2.1.0 or later. Earlier versions are not supported.Docker -- Download from Docker Desktop
Step 1: Configure Kafka Connect
Extract the Debezium SQL Server source connector package to a local directory.
Open the Kafka Connect configuration file
connect-distributed.propertiesand setplugin.pathto the directory that contains the extracted connector:ImportantIn earlier versions of Kafka Connect, the
plugin.pathproperty is not supported. Set theCLASSPATHenvironment variable instead:export CLASSPATH=/kafka/connect/plugins/sqlserver-connector/*# Path to the directory that contains the extracted connector JARs plugin.path=/kafka/connect/plugins
Step 2: Start Kafka Connect
Internet access only -- If you access ApsaraMQ for Kafka over the Internet, configure the JAAS authentication file first. Skip this step for virtual private cloud (VPC) access.
export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"Start Kafka Connect in distributed mode:
bin/connect-distributed.sh config/connect-distributed.properties
Step 3: Set up SQL Server with Docker
CDC requires SQL Server 2016 SP1 or later. For details, see SQL Server 2016 SP1 release notes and About change data capture.
Download docker-compose-sqlserver.yaml.
Start the SQL Server container:
docker-compose -f docker-compose-sqlserver.yaml upDownload inventory.sql and load the test data into the database:
cat inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
Step 4: Enable CDC for existing tables (optional)
If you want to capture changes from tables that already exist in the database, enable CDC at the database level and the table level.
Enable CDC at the database level
USE testDB
GO
EXEC sys.sp_cdc_enable_db
GOEnable CDC for a specific table
USE testDB
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'MyTable',
@role_name = N'MyRole',
@filegroup_name = N'MyDB_CT',
@supports_net_changes = 1
GOThe parameters are:
| Parameter | Description |
|---|---|
@source_schema | Schema of the source table, such as dbo |
@source_name | Name of the table to monitor |
@role_name | Database role that controls access to the change data |
@filegroup_name | Filegroup used to store the change tables |
@supports_net_changes | Set to 1 to enable net change queries |
Verify CDC status
Run the following command to confirm that CDC is active and that your account has the required permissions:
EXEC sys.sp_cdc_help_change_data_capture
GOIf the result is empty, your account does not have access to the CDC-enabled table. Check the role assignment.
Verify that SQL Server Agent is running
CDC depends on SQL Server Agent. Run the following command to check its status:
EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'If the output shows Running, SQL Server Agent is active.
Step 5: Configure and start the source connector
Create topics in ApsaraMQ for Kafka
Before you start the connector, create the required topics in the ApsaraMQ for Kafka console. The connector writes change events to topics named <server-name>.<database>.<table>.
For the test database in this tutorial (testDB with four tables), create the following topics:
| Topic | Purpose |
|---|---|
server1 | Server-level topic for the connector |
server1.testDB.customers | Change events from the customers table |
server1.testDB.orders | Change events from the orders table |
server1.testDB.products | Change events from the products table |
server1.testDB.products_on_hand | Change events from the products_on_hand table |
schema-changes-inventory | Schema change history for the connector |
For instructions on creating topics, see Create a topic. Alternatively, call the CreateTopic API.
Configure the connector
Download register-sqlserver.json.
Open
register-sqlserver.jsonand update the following properties based on your access method. VPC access Internet access The key properties are: For Internet access, the additional SSL and SASL properties enable encrypted, authenticated connections to ApsaraMQ for Kafka.Property Description database.history.kafka.bootstrap.serversEndpoint of your ApsaraMQ for Kafka instance. Use the default endpoint for VPC access or the SSL endpoint for Internet access. You can find the endpoint in the ApsaraMQ for Kafka console. database.server.nameLogical server name used as the prefix for all change event topics. Setting this to server1produces topics such asserver1.testDB.products.database.history.kafka.topicTopic where the connector stores schema change history. Create this topic in the console before you start the connector. "database.history.kafka.bootstrap.servers" : "<your-default-endpoint>", "database.server.name": "server1", "database.history.kafka.topic": "schema-changes-inventory""database.history.kafka.bootstrap.servers" : "<your-ssl-endpoint>", "database.server.name": "server1", "database.history.kafka.topic": "schema-changes-inventory", "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks", "database.history.producer.ssl.truststore.password": "KafkaOnsClient", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "PLAIN", "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks", "database.history.consumer.ssl.truststore.password": "KafkaOnsClient", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "PLAIN"
Start the connector
Register the connector with Kafka Connect by sending a POST request:
curl -i -X POST \
-H "Accept:application/json" \
-H "Content-Type:application/json" \
http://localhost:8083/connectors/ \
-d @register-sqlserver.jsonVerify the results
Insert, update, or delete rows in the monitored SQL Server tables.
In the ApsaraMQ for Kafka console, go to the Message Query page and query the corresponding topics. If change events appear as messages, the pipeline is working. For more information, see Query messages.
What's next
To monitor more tables, enable CDC for each table and create the corresponding topics in ApsaraMQ for Kafka.
For the full list of connector configuration options, see the Debezium SQL Server connector documentation.