Scenarios
In most cases, normal messages are used in microservice decoupling, data integration, and event-driven scenarios. Most of the scenarios have high requirements on transmission reliability and general requirements on the timing and sequence of message processing.
Scenario 1: Asynchronous decoupling of microservices
The preceding figure shows an online e-commerce transaction scenario. In this scenario, the upstream order system encapsulates order placement and payment as an independent normal message and sends the message to the ApsaraMQ for RocketMQ broker. Then, downstream systems subscribe to the message from the broker on demand and process tasks based on the local consumption logic. Messages are independent of each other and do not need to be associated.
Scenario 2: Data integration and transmission
The preceding figure uses offline log collection as an example. An instrumentation component is used to collect operation logs from frontend applications and forward the logs to ApsaraMQ for RocketMQ. Each message is a piece of log data that requires no processing by ApsaraMQ for RocketMQ. ApsaraMQ for RocketMQ needs to only send the log data to the downstream storage system. Subsequent tasks are processed by backend applications.
Working mechanism
What is a normal message?
Normal messages are messages with the basic messaging feature in ApsaraMQ for RocketMQ. Normal messages support asynchronous decoupled communications between producers and consumers.
Lifecycle of a normal message
Initialization
The message is built and initialized by the producer and is ready to be sent to the broker.
Pending consumption
The message is sent to the broker and is visible and available to the consumer.
Being consumed
The message is obtained by the consumer and processed based on the local business logic of the consumer.
In this process, the broker waits for the consumer to return the consumption result. If no response is received from the consumer in a specific period of time, ApsaraMQ for RocketMQ performs retries on the message. For more information, see Consumption retry.
Consumption commitment
The consumer completes the consumption and commits the consumption result to the broker. The broker marks whether the current message is consumed.
By default, ApsaraMQ for RocketMQ retains all messages. When the consumption result is committed, the message is marked as consumed instead of being immediately deleted. Messages are deleted only if the retention period expires or the system has insufficient storage space. Before a message is deleted, the consumer can re-consume the message.
Message deletion
If the retention period of message expires or the storage space is insufficient, ApsaraMQ for RocketMQ deletes the earliest stored messages from the physical file in a rolling manner. For more information, see Message storage and cleanup.
Limits
Normal messages support only topics whose MessageType is set to Normal.
Examples
You can specify message keys and tags to filter or search for normal messages. The following code provides an example on how to send and receive normal messages in Java.
For information about the complete sample code, see Apache RocketMQ 5.x SDKs (recommended).
Sample code
Send normal messages
package doc;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
/**
* The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the virtual private cloud (VPC) endpoint.
* If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance.
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// The name of the topic to which the message is sent. Before you use a topic to receive a message, you must create the topic in the ApsaraMQ for RocketMQ console. Otherwise, an error is returned.
String topic = "Your Topic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration configuration = builder.build();
/**
* When you initialize a producer, you can specify the topics that you want to use to check whether the topic settings are valid and prevent invalid topics from being started.
* You do not need to specify the topics for non-transactional messages. The broker dynamically checks whether the topics are valid.
* Note: To prevent the API operation that is called to query transactional messages from failing, you must specify topics for transactional messages in advance.
*/
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// Send a normal message.
Message message = provider.newMessageBuilder()
.setTopic(topic)
// The message key. You can use a keyword to accurately find the message.
.setKeys("messageKey")
// The message tag. The consumer can use the tag to filter messages.
.setTag("messageTag")
// The message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the message. Take note of the result and capture exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}
Consume normal messages in push mode
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
/**
* The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the VPC endpoint.
* If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance.
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// Specify the topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned.
String topic = "Your Topic";
// Specify the consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned.
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
// The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to.
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// Initialize a push consumer. When you initialize the push consumer, you must specify the consumer group, communication parameters, and subscription for the consumer.
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Specify the consumer group.
.setConsumerGroup(consumerGroup)
// Specify the subscription.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Specify the message listener.
.setMessageListener(messageView -> {
// Consume the messages and return the consumption result.
// LOGGER.info("Consume message={}", messageView);
System.out.println("Consume Message: " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// If you no longer require the push consumer, shut down the process.
//pushConsumer.close();
}
}
Consume normal messages in simple mode
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
public class SimpleConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException {
/**
* The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the VPC endpoint.
* If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance.
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// Specify the topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned.
String topic = "Your Topic";
// Specify the consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned.
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
Duration awaitDuration = Duration.ofSeconds(10);
// The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to.
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// Initialize a simple consumer. When you initialize the simple consumer, you must specify the consumer group, communication parameters, and subscription for the consumer.
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Specify the consumer group.
.setConsumerGroup(consumerGroup)
// Specify the timeout period for long polling requests.
.setAwaitDuration(awaitDuration)
// Specify the subscription.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
// Specify the maximum number of messages to be pulled.
int maxMessageNum = 16;
// Specify the invisible time of the messages.
Duration invisibleDuration = Duration.ofSeconds(10);
// If you use a simple consumer to consume messages, the client must obtain and consume messages in a loop.
// To consume messages in real time, we recommend that you use multiple threads to concurrently pull messages.
while (true) {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
messages.forEach(messageView -> {
// LOGGER.info("Received message: {}", messageView);
System.out.println("Received message: " + messageView);
});
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
// After consumption is complete, the consumer must call the ACK method to commit the consumption result to the broker.
consumer.ack(message);
System.out.println("Message is acknowledged successfully, messageId= " + messageId);
//LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
t.printStackTrace();
//LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
}
// If you no longer require the simple consumer, shut down the process.
// consumer.close();
}
}
Usage notes
Set a globally unique key for each message to facilitate troubleshooting
ApsaraMQ for RocketMQ allows you to specify custom message keys. You can use the key of a message to efficiently query the message and its trace.
Therefore, when you send a message, we recommend that you use the unique information about your business, such as the order ID or user ID, as the key. This helps you quickly locate the message in subsequent queries.