全部產品
Search
文件中心

ApsaraMQ for Kafka:使用Spring Cloud架構收發訊息

更新時間:Dec 27, 2024

Spring Cloud是用於構建訊息驅動的微服務應用程式的架構,提供服務發現、組態管理、訊息傳遞、負載平衡等微服務相關的解決方案,可以更容易地構建分布式系統和進行服務間通訊。本文介紹如何使用Spring Cloud架構接入雲訊息佇列 Kafka 版並收發訊息。

前提條件

公網環境(訊息傳輸需鑒權與加密)

公網環境,訊息採用SASL_SSL協議進行鑒權並加密。用戶端通過SSL存取點訪問雲訊息佇列 Kafka 版。存取點的詳細資料,請參見存取點對比

本樣本將Demo包上傳在/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo路徑。

  1. 登入Linux系統,執行以下命令,進入Demo包所在路徑/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo

    cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo
  2. 執行以下命令,進入設定檔路徑。

    cd sasl-ssl/src/main/resources/
  3. 執行以下命令,編輯application.properties檔案,並根據參數列表配置執行個體資訊。

    vi application.properties
    ##以下參數,您需配置為實際使用的執行個體資訊。
    kafka.bootstrap-servers=alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093
    kafka.consumer.group=test-spring
    kafka.output.topic.name=test-output
    kafka.input.topic.name=test-input
    kafka.ssl.truststore.location=/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl/src/main/resources/kafka.client.truststore.jks
    
    ### 配置Binding參數可以把訊息佇列Kafka版和Spring Cloud Stream的Binder綁定在一起,以下參數保持預設即可。
    spring.cloud.stream.bindings.MyOutput.destination=${kafka.output.topic.name}
    spring.cloud.stream.bindings.MyOutput.contentType=text/plain
    spring.cloud.stream.bindings.MyInput.group=${kafka.consumer.group}
    spring.cloud.stream.bindings.MyInput.destination=${kafka.input.topic.name}
    spring.cloud.stream.bindings.MyInput.contentType=text/plain
    
    ### Binder是Spring Cloud對訊息中介軟體的封裝模組,以下參數保持預設即可。
    spring.cloud.stream.kafka.binder.autoCreateTopics=false
    spring.cloud.stream.kafka.binder.brokers=${kafka.bootstrap-servers}
    spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
    spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN
    spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=${kafka.ssl.truststore.location}
    spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=KafkaOnsClient
    ### 如果Demo中沒有以下參數,請手動增加。該參數表示是否需要進行伺服器主機名稱驗證。因訊息傳輸使用SASL身份校正,可設定為空白字串關閉伺服器主機名稱驗證。
    ### 伺服器主機名稱驗證是驗證SSL認證的主機名稱與伺服器的主機名稱是否匹配,預設為HTTPS。
    spring.cloud.stream.kafka.binder.configuration.ssl.endpoint.identification.algorithm=
    表 1. 參數列表

    參數

    描述

    kafka.bootstrap-servers

    雲訊息佇列 Kafka 版執行個體存取點。您可在雲訊息佇列 Kafka 版控制台实例详情頁面的接入点信息地區擷取。

    kafka.consumer.group

    訂閱訊息的Group。您可以在雲訊息佇列 Kafka 版控制台Group 管理頁面建立。具體操作,請參見步驟三:建立資源

    kafka.output.topic.name

    訊息的Topic。控制台程式通過此Topic每隔一段時間發送訊息,內容是固定的。您可以在雲訊息佇列 Kafka 版控制台Topic 管理頁面建立。具體操作,請參見步驟三:建立資源

    kafka.input.topic.name

    訊息的Topic。您可以通過此Topic在控制台發送訊息,Demo程式會消費訊息,並將訊息列印在日誌中。

    kafka.ssl.truststore.location

    SSL根憑證kafka.client.truststore.jks的存放路徑。

  4. 執行以下命令,開啟kafka_client_jaas.conf檔案,配置執行個體的使用者名稱與密碼。

    vi kafka_client_jaas.conf
    說明
    • 如果執行個體未開啟ACL,您可以在雲訊息佇列 Kafka 版控制台的執行個體詳情頁面擷取預設使用者的使用者名稱和密碼。

    • 如果執行個體已開啟ACL,請確保要使用的SASL使用者為PLAIN類型且已授權收發訊息的許可權。具體資訊,請參見SASL使用者授權

    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="XXX"
      password="XXX";
    };
  5. 進入/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl路徑,執行以下命令,運行Demo。

    sh run_demo.sh

    程式列印如下資訊,說明接收到控制台程式通過kafka.output.topic.name配置的Topic所發送的訊息。

    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
  6. 登入雲訊息佇列 Kafka 版控制台驗證訊息收發是否成功。

    • 查詢kafka.output.topic.name配置的Topic是否接收到控制台程式發送的訊息。具體操作,請參見訊息查詢

    • kafka.input.topic.name配置的Topic發送訊息,查看Demo程式日誌中是否會列印訊息。具體操作,請參見發送訊息

VPC環境(訊息傳輸不鑒權不加密)

VPC環境,訊息可以採用PLAINTEXT協議不鑒權不加密傳輸。用戶端通過預設存取點訪問雲訊息佇列 Kafka 版。存取點的詳細資料,請參見存取點對比

本樣本將Demo包上傳在/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo路徑。

  1. 登入Linux系統,執行以下命令,進入Demo包所在路徑/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo

    cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo
  2. 執行以下命令,進入設定檔路徑。

    cd vpc/src/main/resources/
  3. 執行以下命令,編輯application.properties檔案,並根據參數列表配置執行個體資訊。

    vi application.properties
    ###以下參數請修改為實際使用的執行個體的資訊。
    kafka.bootstrap-servers=alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092
    kafka.consumer.group=test-spring
    kafka.output.topic.name=test-output
    kafka.input.topic.name=test-input
  4. 進入/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/vpc路徑,執行以下命令,運行Demo。

    sh run_demo.sh

    程式列印如下資訊。

    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
    Send: hello world !!
  5. 登入雲訊息佇列 Kafka 版控制台驗證訊息收發是否成功。

    • 查詢kafka.output.topic.name配置的Topic是否接收到控制台程式發送的訊息。具體操作,請參見訊息查詢

    • kafka.input.topic.name配置的Topic發送訊息,查看Demo程式日誌中是否會列印訊息。具體操作,請參見發送訊息

相關文檔

關於Spring Cloud架構的更多資訊,請參見Spring Cloud Stream