すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:注文メッセージ

最終更新日:Aug 16, 2024

注文メッセージは、ApsaraMQ for RocketMQの特集メッセージの一種です。 このトピックでは、順序付けられたメッセージのシナリオ、動作メカニズム、制限、および使用状況について説明します。

シナリオ

異種システムは、順序付きイベント処理、トランザクションマッチメイキング、リアルタイムの増分データ同期などのシナリオで、ステータス同期を使用して強力な一貫性を維持します。 上記のシナリオでは、イベント変更が発生したときに、アップストリームアプリケーションからダウンストリームアプリケーションへのメッセージの順序付き配信が必要です。 ApsaraMQ for RocketMQは、順序付きメッセージを提供し、順序付きメッセージ送信の実装に役立ちます。

シナリオ1: 取引のマッチメイキング

交易撮合

例えば、証券および株式取引のシナリオでは、複数の入札者が入札注文に対して同じ入札価格を提示する場合、最初に入札価格を提示した入札者が入札に勝つ。 したがって、下流注文処理システムは、価格が提示された順序で注文を処理するように設計されなければならない。

シナリオ2: リアルタイムの増分データ同期

図 1. 通常のメッセージ 普通消息

図2. 注文メッセージ 顺序消息

たとえば、データベースの変更に関連するデータの増分同期を実行します。 ApsaraMQ for RocketMQの順序付きメッセージを使用して、上流のソースデータベースから下流のクエリシステムにメッセージを送信できます。 メッセージは、追加、削除、および変更操作のバイナリログとすることができる。 ダウンストリームシステムは、メッセージが送信された順序でメッセージを取得し、同じ順序でデータベースのステータスを更新します。 順序付けられたメッセージは、上流システムの操作と下流システムのステータスデータの一貫性を確保するのに役立ちます。 このシナリオで通常のメッセージを使用すると、ステータスの不整合が発生する可能性があります。

働くメカニズム

注文されたメッセージとは何ですか?

注文メッセージは、ApsaraMQ for RocketMQの特集メッセージの一種です。 順序付けられたメッセージは、メッセージが送信された順序でコンシューマに配信されます。 このタイプのメッセージを使用すると、ビジネスシナリオで順序付き処理を実装できます。

他のタイプのメッセージと比較して、順序付けられたメッセージは、メッセージの送信、格納、および配信の順序によって特徴付けられる。

ApsaraMQ for RocketMQは、メッセージグループを使用して順序付けされたメッセージの順序を識別します。 順序付きメッセージを送信する前に、各メッセージが属するメッセージグループを指定する必要があります。

重要

同じメッセージグループのメッセージのみが、先入れ先出し (FIFO) の順序で処理される。 異なるメッセージグループに属するメッセージ、またはどのメッセージグループにも属さないメッセージの順序は保証されません。

ApsaraMQ for RocketMQは、メッセージグループ内のメッセージの処理順序を保証します。 メッセージ消費ロジックに基づいて、メッセージを異なるグループに分割できます。 これにより、特定のワークロードのメッセージ処理順序を確保しながら、システムの同時実行性とスループットを向上させます。

メッセージの順序を維持する方法

ApsaraMQ for RocketMQのメッセージの順序は、生産段階と消費段階で維持されます。

  • 制作順序: ApsaraMQ for RocketMQは、プロデューサーとブローカーの間に確立されたプロトコルを使用して、メッセージがプロデューサーからブローカーにシリアルに送信され、メッセージが送信と同じ順序で保存および永続化されるようにします。

    メッセージの生成順序を確保するには、次の条件が満たされていることを確認します。

    • 同じメッセージグループ

      プロダクションオーダーは、同じメッセージグループに属するメッセージに対してのみ有効です。 異なるメッセージグループに属するメッセージ、またはどのメッセージグループにも属さないメッセージの作成順序は保証されません。 順序付きメッセージを送信する前に、メッセージごとにメッセージグループを設定できます。

    • 単一プロデューサー

      作成順序は、単一のプロデューサーによって作成されたメッセージに対してのみ有効です。 ApsaraMQ for RocketMQは、メッセージに同じメッセージグループが指定されているかどうかにかかわらず、異なるシステムの異なるプロデューサからのメッセージの順序を判断できません。

    • シリアル伝送

      ApsaraMQ for RocketMQのプロデューサーは、複数のスレッドからのセキュアなアクセスをサポートしています。 プロデューサーが複数のスレッドを使用して同時にメッセージを送信する場合、システムは異なるスレッドからのメッセージの順序を判断できません。

    上記の条件を満たすプロデューサーがApsaraMQ for RocketMQにメッセージを送信した後、システムは、メッセージの送信順序に基づいて、同じメッセージグループのメッセージが同じキューに格納されるようにします。 次の図は、ApsaraMQ for RocketMQブローカーに順序付きメッセージがどのように格納されるかを示しています。顺序存储逻辑

    • ApsaraMQ for RocketMQは、メッセージの送信順序に基づいて、同じメッセージグループのメッセージを同じキューに格納します。

    • ApsaraMQ for RocketMQは、異なるメッセージグループからのメッセージを同じキューに格納できますが、必ずしもメッセージの送信順に格納する必要はありません。

    上の図では、MessageGroup1とMessageGroup4からのメッセージがMessageQueue 1に格納されています。 ApsaraMQ for RocketMQは、MessageGroup 1からG1-M1、G1-M2、G1-M3されたメッセージが、送信された順序と同じ順序でキューに格納されるようにします。 MessageGroup4からG4-M1およびG4-M2されたメッセージも、メッセージが送信された順序で格納されます。 ただし、MessageGroup1およびMessageGroup4からのメッセージは、特定の順序で格納されません。

  • 消費順序: ApsaraMQ for RocketMQは、コンシューマーとブローカーの間に確立されたプロトコルを使用して、メッセージが保存されている順序と同じ順序でメッセージが消費されるようにします。

    メッセージの消費順序を確認するには、次の条件が満たされていることを確認します。

    • 配送注文

      ApsaraMQ for RocketMQは、クライアントSDKとブローカーの通信プロトコルを使用して、メッセージが保存された順序と同じ順序で配信されるようにします。 コンシューマアプリケーションがメッセージを消費するとき、アプリケーションは、非同期消費のためにメッセージが故障するのを防ぐために、受信 − プロセス − 確認プロセスに従わなければならない。

      重要

      コンシューマがプッシュタイプの場合、ApsaraMQ for RocketMQは、メッセージが格納されている順序で1つずつコンシューマに配信されるようにします。 消費者が単純なタイプである場合、複数のメッセージが消費者によって一度にプルされ得る。 この場合、メッセージの消費順序は、ビジネスアプリケーションによって実装される。 コンシューマータイプについては、「コンシューマータイプ」をご参照ください。

    • 限られた再試行

      ApsaraMQ for RocketMQは、失敗した順序付きメッセージが実行できる再試行の数を制限します。 再試行の最大数に達した後にメッセージの配信に失敗した場合、ApsaraMQ for RocketMQはメッセージをスキップし、後続のメッセージの消費を継続します。 これにより、失敗したメッセージがメッセージ処理を常にブロックします。

      消費順序が重要なシナリオでは、順序付けされていないメッセージ処理を防ぐために、適切な再試行回数を指定することを推奨します。

生産オーダーと消費オーダーの組み合わせ

メッセージをFIFO順序で処理する場合は、メッセージの作成順序と消費順序を確保する必要があります。 ほとんどのビジネスシナリオでは、プロデューサは複数のコンシューマにマッピングすることができるが、すべてのコンシューマが順序付けられたメッセージ消費を要求するわけではない。 さまざまなビジネスシナリオで、生産注文と消費注文の柔軟な組み合わせを使用できます。 たとえば、順序付きメッセージを送信しながら同時にメッセージを消費してスループットを向上させることができます。 次の表に、製造オーダーと消費オーダーのさまざまな組み合わせを示します。

制作オーダー

消費注文

効果

注文済み

注文済み

同じメッセージグループ内のメッセージの順序が保証されます。

同じメッセージグループ内のメッセージは、同じ順序で送信および使用されます。

注文済み

同時

メッセージは同時に時間的に消費される。

無言

注文済み

同じキューに格納されているメッセージの順序が保証されます。

ApsaraMQ for RocketMQは、メッセージがキューに格納された順序と同じ順序で使用されることのみを保証します。

無言

同時

メッセージは同時に時間的に消費される。

順序付けられたメッセージのライフサイクル

生命周期

  • 初期化

    メッセージは作成者によって作成され初期化され、ブローカーに送信する準備が整います。

  • 保留中の消費

    メッセージはブローカーに送信され、消費者に表示され、利用可能です。

  • 消費されている

    メッセージは、消費者によって取得され、消費者のローカルビジネスロジックに基づいて処理される。

    このプロセスでは、ブローカーは、消費者が消費結果を返すのを待つ。 特定の期間内にコンシューマーから応答が受信されない場合、ApsaraMQ for RocketMQはメッセージに対して再試行を実行します。 詳細は、「消費の再試行」をご参照ください。

  • 消費コミットメント

    消費者は消費を完了し、消費結果をブローカーにコミットする。 ブローカーは、現在のメッセージが消費されるかどうかをマークします。

    デフォルトでは、ApsaraMQ for RocketMQはすべてのメッセージを保持します。 消費結果がコミットされると、メッセージはすぐに削除されるのではなく、消費済みとしてマークされます。 メッセージは、保存期間が終了した場合、またはシステムのストレージ容量が不足している場合にのみ削除されます。 メッセージが削除される前に、コンシューマはメッセージを再消費することができる。

  • メッセージ削除

    メッセージの保存期間が満了した場合、またはストレージ容量が不足している場合、ApsaraMQ for RocketMQは、物理ファイルから最も早く保存されたメッセージをローリング方式で削除します。 詳細については、「メッセージの保存とクリーンアップ」をご参照ください。

重要
  • メッセージ消費の失敗またはタイムアウトは、ブローカーの再試行ロジックをトリガーします。 メッセージに対して消費の再試行がトリガーされた場合、メッセージはそのライフサイクルの終わりに達します。 メッセージは、新しいメッセージIDを有する新しいメッセージとして再試行される。

  • 順序付けられたメッセージに対して消費の再試行がトリガーされた場合、後続のメッセージは、順序付けられたメッセージが処理された後にのみ処理できます。

制限事項

MessageTypeFIFOに設定されているトピックのみが、順序付きメッセージの送受信に使用できます。

通常のメッセージとは異なり、順序付けられたメッセージにはメッセージグループを指定する必要があります。 ワークロードの分離と同時実行スケーリングを可能にするために、ビジネス要件に基づいてきめ細かいレベルでメッセージグループを構成することをお勧めします。

次のサンプルコードは、Javaで順序付けられたメッセージを送受信する方法の例を示しています。

完全なサンプルコードの詳細については、「Apache RocketMQ 5.x SDK (推奨) 」をご参照ください。

サンプルコード

順序付けられたメッセージを送信する

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;


public class ProducerExample {
    public static void main(String[] args) throws ClientException {
        /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the virtual private cloud (VPC) endpoint. 
         * If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // The name of the topic to which the message is sent. Before you use a topic to receive a message, you must create the topic in the ApsaraMQ for RocketMQ console. Otherwise, an error is returned. 
        String topic = "Your Topic";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        // builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
        ClientConfiguration configuration = builder.build();
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
         // Send an ordered message. 
        Message message = provider.newMessageBuilder()
                .setTopic("topic")
                // The message key. The system can use the key to locate the message. 
                .setKeys("messageKey")
                // The message tag. The consumer can use the tag to filter the message. 
                .setTag("messageTag")
                // The message group. We recommend that you do not include a large number of messages in the group. 
                .setMessageGroup("fifoGroup001")
                // The message body. 
                .setBody("messageBody".getBytes())
                .build();
        try {
            // Send the message. You must take note of the sending result and capture exceptions such as failures.
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }
    }
}

プッシュモードでメッセージを消費する

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;

public class PushConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the VPC endpoint. 
         * If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // The topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String topic = "Your Topic";
        // The consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();
        // The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to. 
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // Initialize a push consumer. When you initialize the push consumer, you must specify the consumer group, communication parameters, and subscription for the consumer. 
        // Make sure that ordered delivery is applied to the consumer group. Otherwise, messages are delivered concurrently and in no particular order. 
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // The consumer group. 
                .setConsumerGroup(consumerGroup)
                // The subscription. 
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // The message listener. 
                .setMessageListener(messageView -> {
                    // Consume the messages and return the consumption result. 
                    // LOGGER.info("Consume message={}", messageView);
                    System.out.println("Consume Message: " + messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        // If you no longer require the push consumer, shut down the process. 
        //pushConsumer.close();
    }
}  

シンプルモードでメッセージを消費する

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;

public class SimpleConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);

    private SimpleConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException {
        /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the VPC endpoint. 
         * If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // The topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String topic = "Your Topic";
        // The consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();

        Duration awaitDuration = Duration.ofSeconds(10);
        // The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to. 
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // Initialize a simple consumer. When you initialize the simple consumer, you must specify the consumer group, communication parameters, and subscription for the consumer. 
        // Make sure that ordered delivery is applied to the consumer group. Otherwise, messages are delivered concurrently and in no particular order. 
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration)
                // The consumer group. 
                .setConsumerGroup(consumerGroup)
                // The timeout period for long polling requests. 
                .setAwaitDuration(awaitDuration)
                // The subscription. 
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).build();
        // The maximum number of messages to be pulled. 
        int maxMessageNum = 16;
        // The invisible time of the messages. 
        Duration invisibleDuration = Duration.ofSeconds(10);
        // If you use a simple consumer to consume messages, the client must obtain and consume messages in a loop. 
        // To consume messages in real time, we recommend that you use multiple threads to concurrently pull messages. 
        while (true) {
            // Note: If the consumption of a message in a message group is not complete, the next message in the message group cannot be retrieved if you call the Receive function. 
            final List<MessageView> messageViewList = consumer.receive(maxMessageNum, invisibleDuration);
            messageViewList.forEach(messageView -> {
                System.out.println(messageView);
                // After consumption is complete, you must invoke ACK to commit the consumption result. 
                try {
                    consumer.ack(messageView);
                } catch (ClientException e) {
                    // If a message fails to be pulled due to throttling or other reasons, you must re-initiate the request to obtain the message. 
                    e.printStackTrace();
                }
            });
        }
        // If you no longer require the simple consumer, shut down the process. 
        // consumer.close();
    }
}                                           

順序付きメッセージの消費リトライログの取得

順序付けされたメッセージがプッシュモードで消費された場合、メッセージはコンシューマークライアントで再試行され、ブローカーは再試行ログの詳細を取得できません。 順序付けられたメッセージのトレースに表示された配信結果が、メッセージの配信に失敗したことを示す場合は、再試行の最大回数とコンシューマークライアントに関する情報をコンシューマークライアントログで確認する必要があります。

コンシューマークライアントのログパスについては、「ログ設定」をご参照ください。

次のキーワードを検索して、クライアントログの消費失敗に関する情報を照会できます。

Message listener raised an exception while consuming messages
Failed to consume fifo message finally, run out of attempt times

が不足しました

使用上の注意

シリアル消費を使用して、メッセージ処理の順序外れを防ぐ

バッチ消費ではなく、シリアルメッセージ消費を使用することを推奨します。 同時に複数のメッセージを消費すると、メッセージ処理の順序が狂ってしまうことがある。

たとえば、メッセージ1、2、3、および4は1-2-3-4の順序で送信され、バッチ消費の順序は1-[2、3](バッチで処理されたが失敗した)-[2、3](再試行)-4です。 メッセージ3の処理に失敗した場合、システムはメッセージ2を繰り返し処理することができる。 その結果、メッセージは順不同で消費される。

メッセージグループに多数のメッセージを含めることを避ける

ApsaraMQ for RocketMQは、同じメッセージグループ内のメッセージが同じキューに格納されるようにします。 メッセージグループに多数のメッセージが含まれていると、対応するキューがオーバーロードされます。 これはメッセージングのパフォーマンスに影響し、スケーラビリティを妨げます。 メッセージグループを設定するときは、順序IDとユーザーIDをメッセージシーケンス条件として使用できます。 これにより、同じエンドユーザーのメッセージの順序が保証されます。

ビジネスアプリケーションのメッセージをメッセージグループごとに分割することを推奨します。 たとえば、異なるユーザーのメッセージの順序を保証する必要がないシナリオでは、順序IDとユーザーIDをメッセージグループのキーワードとして使用して、同じエンドユーザーのメッセージの順序付き処理を実装できます。