注文メッセージは、ApsaraMQ for RocketMQの特集メッセージの一種です。 このトピックでは、順序付けられたメッセージのシナリオ、動作メカニズム、制限、および使用状況について説明します。
シナリオ
異種システムは、順序付きイベント処理、トランザクションマッチメイキング、リアルタイムの増分データ同期などのシナリオで、ステータス同期を使用して強力な一貫性を維持します。 上記のシナリオでは、イベント変更が発生したときに、アップストリームアプリケーションからダウンストリームアプリケーションへのメッセージの順序付き配信が必要です。 ApsaraMQ for RocketMQは、順序付きメッセージを提供し、順序付きメッセージ送信の実装に役立ちます。
作成順序は、単一のプロデューサーによって作成されたメッセージに対してのみ有効です。 ApsaraMQ for RocketMQは、メッセージに同じメッセージグループが指定されているかどうかにかかわらず、異なるシステムの異なるプロデューサからのメッセージの順序を判断できません。
シリアル伝送
ApsaraMQ for RocketMQのプロデューサーは、複数のスレッドからのセキュアなアクセスをサポートしています。 プロデューサーが複数のスレッドを使用して同時にメッセージを送信する場合、システムは異なるスレッドからのメッセージの順序を判断できません。
上記の条件を満たすプロデューサーがApsaraMQ for RocketMQにメッセージを送信した後、システムは、メッセージの送信順序に基づいて、同じメッセージグループのメッセージが同じキューに格納されるようにします。 次の図は、ApsaraMQ for RocketMQブローカーに順序付きメッセージがどのように格納されるかを示しています。
ApsaraMQ for RocketMQは、メッセージの送信順序に基づいて、同じメッセージグループのメッセージを同じキューに格納します。
ApsaraMQ for RocketMQは、異なるメッセージグループからのメッセージを同じキューに格納できますが、必ずしもメッセージの送信順に格納する必要はありません。
上の図では、MessageGroup1とMessageGroup4からのメッセージがMessageQueue 1に格納されています。 ApsaraMQ for RocketMQは、MessageGroup 1からG1-M1、G1-M2、G1-M3されたメッセージが、送信された順序と同じ順序でキューに格納されるようにします。 MessageGroup4からG4-M1およびG4-M2されたメッセージも、メッセージが送信された順序で格納されます。 ただし、MessageGroup1およびMessageGroup4からのメッセージは、特定の順序で格納されません。
消費順序: ApsaraMQ for RocketMQは、コンシューマーとブローカーの間に確立されたプロトコルを使用して、メッセージが保存されている順序と同じ順序でメッセージが消費されるようにします。
メッセージの消費順序を確認するには、次の条件が満たされていることを確認します。
配送注文
ApsaraMQ for RocketMQは、クライアントSDKとブローカーの通信プロトコルを使用して、メッセージが保存された順序と同じ順序で配信されるようにします。 コンシューマアプリケーションがメッセージを消費するとき、アプリケーションは、非同期消費のためにメッセージが故障するのを防ぐために、受信 − プロセス − 確認プロセスに従わなければならない。
重要
コンシューマがプッシュタイプの場合、ApsaraMQ for RocketMQは、メッセージが格納されている順序で1つずつコンシューマに配信されるようにします。 消費者が単純なタイプである場合、複数のメッセージが消費者によって一度にプルされ得る。 この場合、メッセージの消費順序は、ビジネスアプリケーションによって実装される。 コンシューマータイプについては、「コンシューマータイプ」をご参照ください。
限られた再試行
ApsaraMQ for RocketMQは、失敗した順序付きメッセージが実行できる再試行の数を制限します。 再試行の最大数に達した後にメッセージの配信に失敗した場合、ApsaraMQ for RocketMQはメッセージをスキップし、後続のメッセージの消費を継続します。 これにより、失敗したメッセージがメッセージ処理を常にブロックします。
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
/**
* The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the virtual private cloud (VPC) endpoint.
* If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance.
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// The name of the topic to which the message is sent. Before you use a topic to receive a message, you must create the topic in the ApsaraMQ for RocketMQ console. Otherwise, an error is returned.
String topic = "Your Topic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
// builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration configuration = builder.build();
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// Send an ordered message.
Message message = provider.newMessageBuilder()
.setTopic("topic")
// The message key. The system can use the key to locate the message.
.setKeys("messageKey")
// The message tag. The consumer can use the tag to filter the message.
.setTag("messageTag")
// The message group. We recommend that you do not include a large number of messages in the group.
.setMessageGroup("fifoGroup001")
// The message body.
.setBody("messageBody".getBytes())
.build();
try {
// Send the message. You must take note of the sending result and capture exceptions such as failures.
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}
プッシュモードでメッセージを消費する
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
/**
* The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the VPC endpoint.
* If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance.
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// The topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned.
String topic = "Your Topic";
// The consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned.
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
// The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to.
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// Initialize a push consumer. When you initialize the push consumer, you must specify the consumer group, communication parameters, and subscription for the consumer.
// Make sure that ordered delivery is applied to the consumer group. Otherwise, messages are delivered concurrently and in no particular order.
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// The consumer group.
.setConsumerGroup(consumerGroup)
// The subscription.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// The message listener.
.setMessageListener(messageView -> {
// Consume the messages and return the consumption result.
// LOGGER.info("Consume message={}", messageView);
System.out.println("Consume Message: " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// If you no longer require the push consumer, shut down the process.
//pushConsumer.close();
}
}
シンプルモードでメッセージを消費する
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
public class SimpleConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException {
/**
* The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the VPC endpoint.
* If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance.
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// The topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned.
String topic = "Your Topic";
// The consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned.
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console.
* If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information.
* If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password.
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
Duration awaitDuration = Duration.ofSeconds(10);
// The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to.
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// Initialize a simple consumer. When you initialize the simple consumer, you must specify the consumer group, communication parameters, and subscription for the consumer.
// Make sure that ordered delivery is applied to the consumer group. Otherwise, messages are delivered concurrently and in no particular order.
SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration)
// The consumer group.
.setConsumerGroup(consumerGroup)
// The timeout period for long polling requests.
.setAwaitDuration(awaitDuration)
// The subscription.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).build();
// The maximum number of messages to be pulled.
int maxMessageNum = 16;
// The invisible time of the messages.
Duration invisibleDuration = Duration.ofSeconds(10);
// If you use a simple consumer to consume messages, the client must obtain and consume messages in a loop.
// To consume messages in real time, we recommend that you use multiple threads to concurrently pull messages.
while (true) {
// Note: If the consumption of a message in a message group is not complete, the next message in the message group cannot be retrieved if you call the Receive function.
final List<MessageView> messageViewList = consumer.receive(maxMessageNum, invisibleDuration);
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, you must invoke ACK to commit the consumption result.
try {
consumer.ack(messageView);
} catch (ClientException e) {
// If a message fails to be pulled due to throttling or other reasons, you must re-initiate the request to obtain the message.
e.printStackTrace();
}
});
}
// If you no longer require the simple consumer, shut down the process.
// consumer.close();
}
}