本教程介紹如何使用Kafka Connect的Source Connector將SQL Server的資料同步至雲訊息佇列 Kafka 版。
前提條件
在開始本教程前,請確保您已完成以下操作:
- 已下載SQL Server Source Connector。具體資訊,請參見SQL Server Source Connector。
- 已下載Kafka Connect。具體資訊,請參見Kafka Connect。 說明 SQL Server Source Connector目前只支援2.1.0及以上版本的Kafka Connect。
- 已下載Docker。具體資訊,請參見Docker。
步驟一:配置Kafka Connect
- 將下載完成的SQL Server Connector解壓到指定目錄。
- 在Kafka Connect的設定檔connect-distributed.properties中配置外掛程式安裝位置。
## 指定外掛程式解壓後的路徑。 plugin.path=/kafka/connect/plugins重要Kafka Connect的早期版本不支援配置plugin.path,您需要在CLASSPATH中指定外掛程式位置。
export CLASSPATH=/kafka/connect/plugins/sqlserver-connector/*
步驟二:啟動Kafka Connect
配置好connect-distributed.properties後,執行以下命令啟動Kafka Connect。
- 如果是公網接入,需先設定java.security.auth.login.config,如果是VPC接入,可以跳過這一步。
export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf" - 啟動Kafka Connect。
bin/connect-distributed.sh config/connect-distributed.properties
步驟三:安裝SQL Server
重要 SQL Server 2016 SP1以上版本支援CDC,因此您的SQL Server版本必須高於該版本。
- 下載docker-compose-sqlserver.yaml。
- 執行以下命令安裝SQL Server。
docker-compose -f docker-compose-sqlserver.yaml up
步驟四:配置SQL Server
- 下載inventory.sql。
- 執行以下命令初始化SQL Server中的測試資料。
cat inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD' - 可選:如果您需要監聽SQL Server中已有的資料表,請完成以下配置:
- 執行以下命令開啟CDC配置。
## 開啟CDC模板資料庫。 USE testDB GO EXEC sys.sp_cdc_enable_db GO - 執行以下命令開啟指定Table的CDC配置。
## 開啟指定Table的CDC配置。 USE testDB GO EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'MyTable', @role_name = N'MyRole', @filegroup_name = N'MyDB_CT', @supports_net_changes = 1 GO - 執行以下命令確認是否有許可權訪問CDC Table。
EXEC sys.sp_cdc_help_change_data_capture GO說明 如果返回結果為空白,您需要確認是否有許可權訪問該表。 - 執行以下命令確認SQL Server Agent已開啟。
EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'說明 如果返回結果為Running,則說明SQL Server Agent已開啟。
- 執行以下命令開啟CDC配置。
步驟五:啟動SQL Server Connector
- 下載register-sqlserver.json。
- 編輯register-sqlserver.json。
- VPC接入
## 雲訊息佇列 Kafka 版執行個體的預設存取點,您可以在雲訊息佇列 Kafka 版控制台擷取。 "database.history.kafka.bootstrap.servers" : "kafka:9092", ## 您需要提前在雲訊息佇列 Kafka 版控制台建立同名Topic,在本例中建立topic:server1。 ## 所有table的變更資料,會記錄在server1.$DATABASE.$TABLE的topic中,例如server1.testDB.products。 ## 因此您需要提前在雲訊息佇列 Kafka 版控制台中建立所有相關Topic。 "database.server.name": "server1", ## 記錄schema變化資訊將記錄在該Topic中。 ## 您需要提前在雲訊息佇列 Kafka 版控制台建立該Topic。 "database.history.kafka.topic": "schema-changes-inventory" - 公網接入
## 雲訊息佇列 Kafka 版執行個體的SSL存取點,您可以在雲訊息佇列 Kafka 版控制台擷取。 "database.history.kafka.bootstrap.servers" : "kafka:9092", ## 您需要提前在雲訊息佇列 Kafka 版控制台建立同名Topic,在本例中建立topic:server1。 ## 所有table的變更資料,會記錄在server1.$DATABASE.$TABLE的Topic中,例如server1.testDB.products。 ## 因此您需要提前在雲訊息佇列 Kafka 版控制台中建立所有相關Topic。 "database.server.name": "server1", ## 記錄schema變化資訊將記錄在該Topic中。 ## 您需要提前在雲訊息佇列 Kafka 版控制台建立該Topic。 "database.history.kafka.topic": "schema-changes-inventory", ## 通過SSL存取點訪問,還需要修改以下配置。 "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks", "database.history.producer.ssl.truststore.password": "KafkaOnsClient", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "PLAIN", "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks", "database.history.consumer.ssl.truststore.password": "KafkaOnsClient", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "PLAIN",
- VPC接入
- 完成register-sqlserver.json配置後,您需要根據配置在控制台建立相應的Topic,相關操作步驟請參見步驟一:建立Topic。按照本教程中的方式安裝的SQL Server,您可以看到SQL Server中已經提前建立db name:testDB。其中有四張表:
- customers
- orders
- products
- products_on_hand
- server1
- server1.testDB.customers
- server1.testDB.orders
- server1.testDB.products
- server1.testDB.products_on_hand
在register-sqlserver.json中,配置了將schema變化資訊記錄在schema-changes-testDB,因此您還需要使用OpenAPI建立Topic:schema-changes-inventory,相關操作請參見CreateTopic。
- 執行以下命令啟動SQL Server。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
結果驗證
確認雲訊息佇列 Kafka 版能否接收到SQL Server的變更資料:
- 變更監聽SQL Server中的資料。
- 在控制台的訊息查詢頁面,查詢變更訊息。具體操作步驟,請參見訊息查詢。