Ordered messages are a type of message provided by ApsaraMQ for RocketMQ. Ordered messages are published and consumed in strict first in, first out (FIFO) order. This topic provides the sample code for sending and receiving ordered messages by using Apache RocketMQ TCP client SDK for Java.
Background information
Ordered messages are classified into the following types:
Globally ordered messages: All messages of a specified topic are published and consumed in first-in-first-out (FIFO) order.
Partitionally ordered messages: All messages of a specified topic are distributed to different partitions by using sharding keys. The messages in each partition are published and consumed in FIFO order. A sharding key is a key field that is used for ordered messages to identify different partitions. A sharding key is different from the key of a normal message.
For more information, see Ordered messages.
Prerequisites
Before you start, make sure that the following operations are performed:
Apache RocketMQ SDK for Java 4.5.2 or later is downloaded. For more information, visit the Download page of RocketMQ.
The environment is prepared. For more information, see Prepare the environment.
An AccessKey pair is created on your Alibaba Cloud account. For more information, see Create an AccessKey pair.
Send ordered messages
An ApsaraMQ for RocketMQ broker determines the order in which messages are generated based on the order in which the sender uses a single producer or thread to send messages. If the sender uses multiple producers or threads to concurrently send messages, the message order is determined by the order in which the messages are received by the ApsaraMQ for RocketMQ broker. This order may be different from the sending order on the business side.
The following sample code provides an example on how to send ordered messages by using Apache RocketMQ TCP client SDK for Java:
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQOrderProducer {
private static RPCHook getAclRPCHook()
{
/**
* The AccessKey ID and AccessKey secret of your Alibaba Cloud account.
* Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured.
*/
return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}
public static void main(String[] args) throws MQClientException {
/**
* Create a producer and enable the message trace feature.
* If you do not want to enable the message trace feature, you can use the following method to create a producer:
* DefaultMQProducer producer = new DefaultMQProducer("YOUR ORDER GROUP ID", getAclRPCHook());
*/
DefaultMQProducer producer = new DefaultMQProducer("YOUR ORDER GROUP ID", getAclRPCHook(), true, null);
/**
* Specify Alibaba Cloud as the access channel. Before you use the message trace feature on the cloud, configure this parameter. If you do not want to enable the message trace feature, leave this parameter empty.
*/
producer.setAccessChannel(AccessChannel.CLOUD);
/**
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The value is in the format of http://MQ_INST_XXXX.aliyuncs.com:80.
*/
producer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
producer.start();
for (int i = 0; i < 128; i++) {
try {
int orderId = i % 10;
Message msg = new Message("YOUR ORDER TOPIC",
"YOUR MESSAGE TAG",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
/**
* Note: This parameter is required. Ordered messages can be evenly distributed to each queue only after this parameter is configured.
* If the version of the SDK is 5.x or later, you can use the following method to specify the message order:
* msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
*/
msg.putUserProperty("__SHARDINGKEY", orderId + "");
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// Select a partition selection algorithm that meets your business requirements. The algorithm can be used to ensure that the results of the same parameter are the same.
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
Subscribe to ordered messages
The following sample code provides an example on how to subscribe to ordered messages by using Apache RocketMQ TCP client SDK for Java:
import java.util.List;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
public class RocketMQOrderConsumer {
private static RPCHook getAclRPCHook()
{
/**
* The AccessKey ID and AccessKey secret of your Alibaba Cloud account.
* Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured.
*/
return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}
public static void main(String[] args) throws MQClientException {
/**
* Create a consumer and enable the message trace feature.
* If you do not want to enable the message trace feature, you can use the following method to create a producer:
* DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR ORDER GROUP ID", getAclRPCHook(), null);
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("YOUR ORDER GROUP ID", getAclRPCHook(), new AllocateMessageQueueAveragely(), true, null);
/**
* Specify Alibaba Cloud as the access channel. Before you use the message trace feature on the cloud, configure this parameter. If you do not want to enable the message trace feature, leave this parameter empty.
*/
consumer.setAccessChannel(AccessChannel.CLOUD);
/**
// The endpoint that you obtained in the ApsaraMQ for RocketMQ console. The value is in the format of http://MQ_INST_XXXX.aliyuncs.com:80.
*/
consumer.setNamesrvAddr("http://xxxx.mq-internet.aliyuncs.com:80");
consumer.subscribe("YOUR ORDER TOPIC", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeOrderlyStatus.SUCCESS;// If a message fails to be consumed, the request is suspended and retried and the following result is returned: ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}