全部產品
Search
文件中心

ApsaraMQ for Kafka:使用Kafka Connect將SQL Server資料同步至雲訊息佇列 Kafka 版

更新時間:Sep 02, 2025

本教程介紹如何使用Kafka Connect的Source Connector將SQL Server的資料同步至雲訊息佇列 Kafka 版

前提條件

在開始本教程前,請確保您已完成以下操作:

  • 已下載SQL Server Source Connector。具體資訊,請參見SQL Server Source Connector
  • 已下載Kafka Connect。具體資訊,請參見Kafka Connect
    說明 SQL Server Source Connector目前只支援2.1.0及以上版本的Kafka Connect。
  • 已下載Docker。具體資訊,請參見Docker

步驟一:配置Kafka Connect

  1. 將下載完成的SQL Server Connector解壓到指定目錄。
  2. 在Kafka Connect的設定檔connect-distributed.properties中配置外掛程式安裝位置。
    ## 指定外掛程式解壓後的路徑。
    plugin.path=/kafka/connect/plugins
    重要

    Kafka Connect的早期版本不支援配置plugin.path,您需要在CLASSPATH中指定外掛程式位置。

    export CLASSPATH=/kafka/connect/plugins/sqlserver-connector/*

步驟二:啟動Kafka Connect

配置好connect-distributed.properties後,執行以下命令啟動Kafka Connect。

  1. 如果是公網接入,需先設定java.security.auth.login.config,如果是VPC接入,可以跳過這一步。
    export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"
  2. 啟動Kafka Connect。
    bin/connect-distributed.sh config/connect-distributed.properties

步驟三:安裝SQL Server

重要 SQL Server 2016 SP1以上版本支援CDC,因此您的SQL Server版本必須高於該版本。
  1. 下載docker-compose-sqlserver.yaml
  2. 執行以下命令安裝SQL Server。
    docker-compose -f docker-compose-sqlserver.yaml up

步驟四:配置SQL Server

  1. 下載inventory.sql
  2. 執行以下命令初始化SQL Server中的測試資料。
    cat inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
  3. 可選:如果您需要監聽SQL Server中已有的資料表,請完成以下配置:
    1. 執行以下命令開啟CDC配置。
      ## 開啟CDC模板資料庫。
      USE testDB
      GO
      EXEC sys.sp_cdc_enable_db
      GO
    2. 執行以下命令開啟指定Table的CDC配置。
      ## 開啟指定Table的CDC配置。
      USE testDB
      GO
      
      EXEC sys.sp_cdc_enable_table
      @source_schema = N'dbo',
      @source_name   = N'MyTable',
      @role_name     = N'MyRole',
      @filegroup_name = N'MyDB_CT',
      @supports_net_changes = 1
      GO
    3. 執行以下命令確認是否有許可權訪問CDC Table。
      EXEC sys.sp_cdc_help_change_data_capture
      GO
      說明 如果返回結果為空白,您需要確認是否有許可權訪問該表。
    4. 執行以下命令確認SQL Server Agent已開啟。
      EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'
      說明 如果返回結果為Running,則說明SQL Server Agent已開啟。

步驟五:啟動SQL Server Connector

  1. 下載register-sqlserver.json
  2. 編輯register-sqlserver.json
    • VPC接入
      ## 雲訊息佇列 Kafka 版執行個體的預設存取點,您可以在雲訊息佇列 Kafka 版控制台擷取。
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## 您需要提前在雲訊息佇列 Kafka 版控制台建立同名Topic,在本例中建立topic:server1。
      ## 所有table的變更資料,會記錄在server1.$DATABASE.$TABLE的topic中,例如server1.testDB.products。
      ## 因此您需要提前在雲訊息佇列 Kafka 版控制台中建立所有相關Topic。
      "database.server.name": "server1",
      ## 記錄schema變化資訊將記錄在該Topic中。
      ## 您需要提前在雲訊息佇列 Kafka 版控制台建立該Topic。
      "database.history.kafka.topic": "schema-changes-inventory"
    • 公網接入
      ## 雲訊息佇列 Kafka 版執行個體的SSL存取點,您可以在雲訊息佇列 Kafka 版控制台擷取。
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## 您需要提前在雲訊息佇列 Kafka 版控制台建立同名Topic,在本例中建立topic:server1。
      ## 所有table的變更資料,會記錄在server1.$DATABASE.$TABLE的Topic中,例如server1.testDB.products。
      ## 因此您需要提前在雲訊息佇列 Kafka 版控制台中建立所有相關Topic。
      "database.server.name": "server1",
      ## 記錄schema變化資訊將記錄在該Topic中。
      ## 您需要提前在雲訊息佇列 Kafka 版控制台建立該Topic。
      "database.history.kafka.topic": "schema-changes-inventory",
      ## 通過SSL存取點訪問,還需要修改以下配置。
      "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.producer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.producer.security.protocol": "SASL_SSL",
      "database.history.producer.sasl.mechanism": "PLAIN",
      "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.consumer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.consumer.security.protocol": "SASL_SSL",
      "database.history.consumer.sasl.mechanism": "PLAIN",
  3. 完成register-sqlserver.json配置後,您需要根據配置在控制台建立相應的Topic,相關操作步驟請參見步驟一:建立Topic
    按照本教程中的方式安裝的SQL Server,您可以看到SQL Server中已經提前建立db name:testDB。其中有四張表:
    • customers
    • orders
    • products
    • products_on_hand
    根據以上register-sqlserver.json的配置,您需要使用OpenAPI建立Topic:
    • server1
    • server1.testDB.customers
    • server1.testDB.orders
    • server1.testDB.products
    • server1.testDB.products_on_hand

    register-sqlserver.json中,配置了將schema變化資訊記錄在schema-changes-testDB,因此您還需要使用OpenAPI建立Topic:schema-changes-inventory,相關操作請參見CreateTopic

  4. 執行以下命令啟動SQL Server。
    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json

結果驗證

確認雲訊息佇列 Kafka 版能否接收到SQL Server的變更資料:

  1. 變更監聽SQL Server中的資料。
  2. 在控制台的訊息查詢頁面,查詢變更訊息。具體操作步驟,請參見訊息查詢