Spring Cloud is a framework that is used to build message-driven microservice-oriented applications. Providing microservice-related solutions such as service discovery, configuration management, message transmission, and load balancing, the framework can be used to efficiently build distributed systems and implement communications between microservices. This topic describes how to use the Spring Cloud framework to connect to ApsaraMQ for Kafka to send and receive messages.
Prerequisites
JDK 1.8 or later is installed. For more information, see Java Downloads.
Maven 2.5 or later is installed. For more information, see Downloading Apache Maven.
The demo package is downloaded from kafka-spring-stream-demo and uploaded to the prepared Linux system.
An ApsaraMQ for Kafka instance of version 2.x or later is created. For information about how to upgrade the version of an instance, see Upgrade instance versions.
Topics and consumer groups are created on the instance. For more information, see Step 3: Create resources.
Internet environments (authentication and encryption required for message transmission)
If your client connects to your ApsaraMQ for Kafka instance over the Internet, the SASL_SSL protocol is used for authentication and encryption. In Internet environments, a client uses the Secure Sockets Layer (SSL) endpoint to access an ApsaraMQ for Kafka instance. For information about endpoints, see Comparison among endpoints.
In this example, the demo package is uploaded to the /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo directory.
Log on to the Linux system and run the following command to go to the /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo directory of the demo package:
cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo
Run the following command to go to the directory of the configuration file:
cd sasl-ssl/src/main/resources/
Run the following command to open the application.properties configuration file and specify the instance settings in the configuration file based on Parameters:
vi application.properties
## Configure the following parameters based on your instance settings: kafka.bootstrap-servers=alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093 kafka.consumer.group=test-spring kafka.output.topic.name=test-output kafka.input.topic.name=test-input kafka.ssl.truststore.location=/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl/src/main/resources/kafka.client.truststore.jks ### Configure binding parameters to bind the ApsaraMQ for Kafka instance to Spring Cloud Stream Binder. Retain the default settings for the following parameters: spring.cloud.stream.bindings.MyOutput.destination=${kafka.output.topic.name} spring.cloud.stream.bindings.MyOutput.contentType=text/plain spring.cloud.stream.bindings.MyInput.group=${kafka.consumer.group} spring.cloud.stream.bindings.MyInput.destination=${kafka.input.topic.name} spring.cloud.stream.bindings.MyInput.contentType=text/plain ### Binder is the encapsulation module of Spring Cloud for messaging middleware. Retain the default settings for the following parameters: spring.cloud.stream.kafka.binder.autoCreateTopics=false spring.cloud.stream.kafka.binder.brokers=${kafka.bootstrap-servers} spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=${kafka.ssl.truststore.location} spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=KafkaOnsClient ### If the following parameter is not included in the demo, manually add the parameter. This parameter specifies whether to enable server hostname verification. You can set this parameter to an empty string to disable server hostname verification because Simple Authentication and Security Layer (SASL) is used for identity verification. ### Server hostname verification is to verify whether the hostname in the root SSL certificate matches the hostname of the server. The default value of this parameter is HTTPS. spring.cloud.stream.kafka.binder.configuration.ssl.endpoint.identification.algorithm=
Table 1. Parameters Parameter
Description
kafka.bootstrap-servers
The endpoint of the ApsaraMQ for Kafka instance. You can obtain the endpoint in the Endpoint Information section of the Instance Details page in the ApsaraMQ for Kafka console.
kafka.consumer.group
The consumer group that subscribes to messages. You can create the consumer group on the Groups page in the ApsaraMQ for Kafka console. For more information, see Step 3: Create resources.
kafka.output.topic.name
The topic for outbound messages. The console program uses this topic to send messages at regular intervals. The content of each message is fixed. You can create the topic on the Topics page in the ApsaraMQ for Kafka console. For more information, see Step 3: Create resources.
kafka.input.topic.name
The topic for inbound messages. You can use this topic to send messages in the console. The demo program consumes the messages and displays the messages in logs.
kafka.ssl.truststore.location
The storage path of the root SSL certificate kafka.client.truststore.jks.
Run the following command to open the kafka_client_jaas.conf file and specify the username and password of the SASL user for the instance:
vi kafka_client_jaas.conf
NoteIf the access control list (ACL) feature is disabled for the ApsaraMQ for Kafka instance, you can obtain the username and password of the default SASL user for the instance on the Instance Details page in the ApsaraMQ for Kafka console.
If the ACL feature is enabled for the ApsaraMQ for Kafka instance, make sure that the SASL user that you use is of the PLAIN type and that the user is granted permissions to send and receive messages. For more information, see Grant permissions to SASL users.
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="XXX" password="XXX"; };
Go to the /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl directory and run the following command to run the demo:
sh run_demo.sh
If the following information is returned, the demo program receives the messages sent from the console program by using the topic specified in kafka.output.topic.name.
Send: hello world !! Send: hello world !! Send: hello world !! Send: hello world !!
Log on to the ApsaraMQ for Kafka console to verify whether messages are sent and received.
Check whether the topic specified in kafka.output.topic.name received the messages sent from the console program. For more information, see Query messages.
Send a message by using the topic specified in kafka.input.topic.name and check whether the message is displayed in the log of the demo program. For more information, see Send a message.
VPC environments (authentication and encryption not required for message transmission)
If a client connects to an ApsaraMQ for Kafka instance in a virtual private cloud (VPC), the PLAINTEXT protocol is used to transmit messages, and authentication and encryption are not required. In VPC environments, a client uses the default endpoint to access an ApsaraMQ for Kafka instance. For information about endpoints, see Comparison among endpoints.
In this example, the demo package is uploaded to the /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo directory.
Log on to the Linux system and run the following command to go to the /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo directory of the demo package:
cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo
Run the following command to go to the directory of the configuration file:
cd vpc/src/main/resources/
Run the following command to open the application.properties configuration file and specify the instance settings in the configuration file based on Parameters:
vi application.properties
### Configure the following parameters based on your instance settings: kafka.bootstrap-servers=alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092 kafka.consumer.group=test-spring kafka.output.topic.name=test-output kafka.input.topic.name=test-input
Go to the /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/vpc directory and run the following command to run the demo:
sh run_demo.sh
The following information is returned:
Send: hello world !! Send: hello world !! Send: hello world !! Send: hello world !!
Log on to the ApsaraMQ for Kafka console to verify whether messages are sent and received.
Check whether the topic specified in kafka.output.topic.name received the messages sent from the console program. For more information, see Query messages.
Send a message by using the topic specified in kafka.input.topic.name and check whether the message is displayed in the log of the demo program. For more information, see Send a message.
References
For more information about the Spring Cloud framework, see Spring Cloud Stream Reference Documentation.