シナリオ 順序付きイベント処理、トランザクションマッチメイキング、リアルタイム増分データ同期などのシナリオでは、異種システムは状態同期を通じて強力な整合性を維持する必要があります。これらのシナリオでは、アップストリームシステムのイベント変更をダウンストリームシステムに送信して順次処理する必要があります。ApsaraMQ for RocketMQ の順序付きメッセージを使用して、順次データ送信を保証できます。
シナリオ 1: トランザクションマッチメイキング
証券および株式取引のマッチメイキングシナリオでは、複数の入札が同じ価格である場合、先入れ先出し (FIFO) の原則が適用されます。ダウンストリームの注文処理システムは、入札が行われた順序に厳密に従って注文を処理する必要があります。
シナリオ 2: リアルタイム増分データ同期 図 1. 通常メッセージ
図 2. 順序付きメッセージ
データベース変更の増分同期シナリオでは、アップストリームのソースデータベースが追加、削除、変更の操作を実行します。二項演算ログは、ApsaraMQ for RocketMQ を介してメッセージとしてダウンストリームの検索システムに送信されます。ダウンストリームシステムは、メッセージからデータを順次復元して状態をリフレッシュします。通常のメッセージを使用すると、状態が期待される操作結果と一致しなくなる可能性があります。順序付きメッセージは、ダウンストリームの状態がアップストリームの操作結果と一致することを保証します。
仕組み 順序付きメッセージとは 順序付きメッセージは、ApsaraMQ for RocketMQ の高度なメッセージタイプです。これにより、コンシューマーは送信された順序と同じ順序でメッセージを受信できます。これにより、ビジネスシナリオでの順次処理が可能になります。
他のメッセージタイプとは異なり、順序付きメッセージは、送信、ストレージ、および配信中に複数のメッセージ間の順序関係を保証します。
ApsaraMQ for RocketMQ では、メッセージの順序はメッセージグループによって定義されます。順序付きメッセージを送信する場合、各メッセージにメッセージグループを指定する必要があります。
重要 順序は、同じメッセージグループ内のメッセージに対してのみ保証されます。異なるメッセージグループ内のメッセージや、グループが指定されていないメッセージの順序は保証されません。
メッセージグループベースの順序付けロジックにより、ビジネスロジックに基づいて詳細な分割が可能になります。これにより、ローカルビジネスの順序を確保しながら、システムの並列性とスループットを向上させることができます。
メッセージの順序を保証する方法 ApsaraMQ for RocketMQ でのメッセージの順序付けには、送信順序と消費順序の 2 つの段階があります。
送信順序 : ApsaraMQ for RocketMQ は、プロデューサーとサーバー間のプロトコルを使用して、単一のプロデューサーによってシリアルに送信されたメッセージがその順序で保存および永続化されることを保証します。
メッセージの送信順序を保証するには、次の条件を満たす必要があります:
同じメッセージグループ
送信順序は、単一のメッセージグループにのみ適用されます。プロデューサーがメッセージを送信するとき、各メッセージにメッセージグループを設定できます。順序は、同じメッセージグループ内のメッセージに対してのみ保証されます。異なるメッセージグループ内のメッセージや、グループが指定されていないメッセージの順序は保証されません。
単一のプロデューサー
送信順序は、単一のプロデューサーに対してのみ保証されます。異なるプロデューサーが同じメッセージグループにメッセージを送信する場合、それらが異なるシステムに分散されていても、メッセージの順序は保証できません。
シリアル送信
ApsaraMQ for RocketMQ プロデューサークライアントは、マルチスレッドのセキュアなアクセスをサポートしています。ただし、プロデューサーが複数のスレッドを使用してメッセージを同時に送信する場合、異なるスレッドによって送信されたメッセージの順序は保証できません。
これらの条件を満たすプロデューサーが順序付きメッセージを ApsaraMQ for RocketMQ に送信すると、同じメッセージグループを持つメッセージは、送信された順序で同じキューに保存されます。サーバーのシーケンシャルストレージロジックは次のとおりです:
上の図に示すように、メッセージグループ 1 とメッセージグループ 4 のメッセージはキュー 1 に混在しています。ApsaraMQ for RocketMQ は、メッセージグループ 1 のメッセージ G1-M1、G1-M2、G1-M3 が送信された順序で保存されることを保証します。メッセージグループ 4 のメッセージ G4-M1 と G4-M2 も順序通りに保存されます。ただし、メッセージグループ 1 とメッセージグループ 4 のメッセージ間の順序は保証されません。
消費順序 : ApsaraMQ for RocketMQ は、コンシューマーとサーバー間のプロトコルを使用して、メッセージが保存された順序に厳密に従って消費されることを保証します。
メッセージの消費順序を保証するには、次の条件を満たす必要があります:
配信順序
ApsaraMQ for RocketMQ は、クライアント SDK とサーバー通信プロトコルを使用して、メッセージが保存された順序で配信されることを保証します。ただし、アプリケーションがメッセージを消費する場合、非同期処理によるメッセージの順序の乱れを防ぐために、受信-処理-確認のセマンティクスに厳密に従う必要があります。
重要 コンシューマータイプが PushConsumer の場合、ApsaraMQ for RocketMQ はメッセージが保存された順序で 1 つずつコンシューマーに配信されることを保証します。コンシューマータイプが SimpleConsumer の場合、コンシューマーは一度に複数のメッセージをプルすることがあります。この場合、アプリケーションは消費順序を保証する必要があります。コンシューマータイプの詳細については、「コンシューマータイプ 」をご参照ください。
制限付きリトライ
ApsaraMQ for RocketMQ での順序付きメッセージの配信は、特定のリトライ回数に制限されています。メッセージの消費が一貫して失敗し、最大リトライ回数を超えた場合、そのメッセージは再試行されなくなります。システムはこのメッセージをスキップして、後続のメッセージ処理をブロックしないようにします。
厳密な消費順序が必要なシナリオでは、不適切なパラメーター設定によるメッセージの順序の乱れを防ぐために、適切なリトライ回数を設定できます。
送信順序と消費順序の組み合わせ メッセージを厳密に FIFO 順で処理する必要がある場合は、送信順序と消費順序の両方を保証する必要があります。ただし、一般的なビジネスシナリオでは、単一のプロデューサーが複数のダウンストリームコンシューマーに接続することがあり、そのすべてが順序付き消費を必要とするわけではありません。さまざまなビジネスシナリオに対して、送信順序と消費順序の異なる組み合わせを使用できます。たとえば、順序付きメッセージを送信するが、同時消費を使用してスループットを向上させることができます。次の表に、いくつかの組み合わせを示します:
生産オーダー
消費順序
順序付け効果
順序付き送信 (メッセージグループが設定されている)。
順序付き消費
メッセージグループ レベルで厳密なメッセージ順序が保証されます。
消費順序は、同じメッセージグループ内のメッセージの送信順序と同じです。
順序付き送信 (メッセージグループが設定されている)。
同時消費
同時消費。メッセージは可能な限り時系列で処理されますが、順序は保証されません。
非順序送信 (メッセージグループが設定されていない)。
順序付き消費
キューレベルでの厳密な順序。
ApsaraMQ for RocketMQ のキュープロパティに基づき、消費順序はキュー内のストレージ順序と一致しますが、送信順序と一致することは保証されません。
非順序送信 (メッセージグループが設定されていない)。
同時消費
同時消費。メッセージは可能な限り時系列で処理されますが、順序は保証されません。
順序付きメッセージのライフサイクル
初期化
メッセージはプロデューサーによって構築および初期化され、ブローカーに送信される準備ができています。
消費保留中
メッセージはブローカーに送信され、コンシューマーに表示され、利用可能になります。
消費中
メッセージはコンシューマーによって取得され、コンシューマーのローカルビジネスロジックに基づいて処理されます。
このプロセスでは、ブローカーはコンシューマーが消費結果を返すのを待ちます。特定の期間内にコンシューマーから応答がない場合、ApsaraMQ for RocketMQ はメッセージに対してリトライを実行します。詳細については、「消費リトライ 」をご参照ください。
消費コミット
コンシューマーは消費を完了し、消費結果をブローカーにコミットします。ブローカーは、現在のメッセージが消費されたかどうかをマークします。
デフォルトでは、ApsaraMQ for RocketMQ はすべてのメッセージを保持します。消費結果がコミットされると、メッセージはすぐに削除されるのではなく、消費済みとしてマークされます。メッセージは、保持期間が過ぎた場合、またはシステムに十分なストレージ容量がない場合にのみ削除されます。メッセージが削除される前に、コンシューマーはメッセージを再消費できます。
メッセージの削除
メッセージの保持期間が過ぎた場合、またはストレージ容量が不足している場合、ApsaraMQ for RocketMQ は、物理ファイルから最も早く保存されたメッセージをローリング方式で削除します。詳細については、「メッセージの保存とクリーンアップ 」をご参照ください。
重要 メッセージの消費に失敗したか、消費がタイムアウトした場合、サーバーのリトライロジックがトリガーされます。リトライされたメッセージは新しいメッセージと見なされ、元のメッセージのライフサイクルは終了します。
メッセージの順序を保証するため、失敗した順序付きメッセージがリトライされると、同じメッセージグループ内の後続のメッセージはブロックされます。それらは、現在のメッセージが正常に消費された後にのみ消費できます。
制限 MessageType が FIFO に設定されている Topic にのみ順序付きメッセージを送信できます。メッセージタイプは Topic タイプと一致する必要があります。
コンシューマーの同時実行の最適化 クライアントは RocketMQ 5.x gRPC SDK を使用する必要があります。順序付きメッセージを消費する場合、この SDK シリーズの PushConsumer クライアントは、同じ MessageQueue からのメッセージを、その MessageGroup に基づいて異なるスレッドに割り当てて同時消費することができます。この機能は順序付きメッセージ消費の同時実行性を大幅に向上させます 。MessageGroup の値がより離散的である場合、改善はより顕著になります。サポートされている SDK バージョンは次のとおりです:
Java SDK: 5.0.8 以降。
C++ SDK: 5.0.3 以降。
その他の SDK: サポートされていません。
例 通常のメッセージの送信とは異なり、順序付きメッセージを送信するときはメッセージグループを設定する必要があります。ビジネスシナリオに基づいて、可能な限り細かい粒度でメッセージグループを設計できます。これにより、ビジネスロジックの分割が容易になり、同時スケーリングが向上します。
次のサンプルコードは、Java で順序付きメッセージを送受信する方法を示しています:
メッセージの送受信に関する完全なサンプルコードについては、「RocketMQ 5.x gRPC SDK 」をご参照ください。
サンプルコード 順序付きメッセージを送信する import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
/**
* インスタンスのエンドポイント。コンソールの [インスタンス詳細] ページの [エンドポイント] タブからエンドポイントを取得できます。
* Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合は、VPC エンドポイントを使用することをお勧めします。
* ローカルコンピューターまたはデータセンターからインターネット経由でインスタンスにアクセスする場合は、パブリックエンドポイントを使用できます。パブリックエンドポイントを使用するには、インスタンスのパブリックネットワークアクセスを有効にする必要があります。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// メッセージの宛先 Topic の名前。事前にコンソールで Topic を作成する必要があります。存在しない Topic を使用するとエラーが返されます。
String topic = "Your Topic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* パブリックエンドポイントを使用する場合は、インスタンスのユーザー名とパスワードも設定する必要があります。ユーザー名とパスワードは、コンソールのインスタンスの [アクセス制御] ページの [インテリジェント認証] タブから取得できます。
* Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、これを設定する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。
* インスタンスがサーバーレスインスタンスの場合、パブリックネットワークアクセスのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由の認証不要アクセスが有効になっている場合、内部ネットワークアクセスのユーザー名とパスワードを設定する必要はありません。
*/
// builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration configuration = builder.build();
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// 順序付きメッセージを送信します。
Message message = provider.newMessageBuilder()
.setTopic("topic")
// メッセージキーを設定します。キーを使用して特定のメッセージを正確に見つけることができます。
.setKeys("messageKey")
// メッセージタグを設定します。コンシューマーはタグを使用してメッセージをフィルターできます。
.setTag("messageTag")
// 順序付きメッセージのメッセージグループを設定します。ホットスポットメッセージグループを避けるために、このグループをできるだけ離散させてください。
.setMessageGroup("fifoGroup001")
// メッセージ本文。
.setBody("messageBody".getBytes())
.build();
try {
// メッセージを送信します。送信結果に注意し、失敗などの例外をキャッチします。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}PushConsumer でメッセージを消費する import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
/**
* インスタンスのエンドポイント。コンソールの [インスタンス詳細] ページの [エンドポイント] タブからエンドポイントを取得できます。
* Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合は、VPC エンドポイントを使用することをお勧めします。
* ローカルコンピューターまたはデータセンターからインターネット経由でインスタンスにアクセスする場合は、パブリックエンドポイントを使用できます。パブリックエンドポイントを使用するには、インスタンスのパブリックネットワークアクセスを有効にする必要があります。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// サブスクライブする宛先 Topic。事前にコンソールで Topic を作成する必要があります。存在しない Topic を使用するとエラーが返されます。
String topic = "Your Topic";
// コンシューマーが属するコンシューマーグループ。事前にコンソールでグループを作成する必要があります。存在しないグループを使用するとエラーが返されます。
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* パブリックエンドポイントを使用する場合は、インスタンスのユーザー名とパスワードも設定する必要があります。ユーザー名とパスワードは、コンソールのインスタンスの [アクセス制御] ページの [インテリジェント認証] タブから取得できます。
* Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、これを設定する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。
* インスタンスがサーバーレスインスタンスの場合、パブリックネットワークアクセスのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由の認証不要アクセスが有効になっている場合、内部ネットワークアクセスのユーザー名とパスワードを設定する必要はありません。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
// メッセージサブスクリプションのフィルター式。これは、任意のタグを持つすべてのメッセージがサブスクライブされることを示します。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// PushConsumer を初期化します。コンシューマーグループ、通信パラメーター、およびサブスクリプション関係をアタッチする必要があります。
// 順序付きメッセージを消費する場合、現在のコンシューマーグループが順序付き配信モードであることを確認してください。そうでない場合、メッセージは依然として同時に、順不同で配信されます。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// コンシューマーグループを設定します。
.setConsumerGroup(consumerGroup)
// 事前にアタッチされたサブスクリプション関係を設定します。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// メッセージリスナーを設定します。
.setMessageListener(messageView -> {
// メッセージを処理し、消費結果を返します。
// LOGGER.info("Consume message={}", messageView);
System.out.println("Consume Message: " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// PushConsumer が不要になった場合は、プロセスをシャットダウンできます。
//pushConsumer.close();
}
} SimpleConsumer でメッセージを消費する import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
public class SimpleConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException {
/**
* インスタンスのエンドポイント。コンソールの [インスタンス詳細] ページの [エンドポイント] タブからエンドポイントを取得できます。
* Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合は、VPC エンドポイントを使用することをお勧めします。
* ローカルコンピューターまたはデータセンターからインターネット経由でインスタンスにアクセスする場合は、パブリックエンドポイントを使用できます。パブリックエンドポイントを使用するには、インスタンスのパブリックネットワークアクセスを有効にする必要があります。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
// サブスクライブする宛先 Topic。事前にコンソールで Topic を作成する必要があります。存在しない Topic を使用するとエラーが返されます。
String topic = "Your Topic";
// コンシューマーが属するコンシューマーグループ。事前にコンソールでグループを作成する必要があります。存在しないグループを使用するとエラーが返されます。
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* パブリックエンドポイントを使用する場合は、インスタンスのユーザー名とパスワードも設定する必要があります。ユーザー名とパスワードは、コンソールのインスタンスの [アクセス制御] ページの [インテリジェント認証] タブから取得できます。
* Alibaba Cloud ECS インスタンスから内部ネットワーク経由でインスタンスにアクセスする場合、これを設定する必要はありません。サーバーは VPC に基づいて情報を自動的に取得します。
* インスタンスがサーバーレスインスタンスの場合、パブリックネットワークアクセスのユーザー名とパスワードを設定する必要があります。内部ネットワーク経由の認証不要アクセスが有効になっている場合、内部ネットワークアクセスのユーザー名とパスワードを設定する必要はありません。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
Duration awaitDuration = Duration.ofSeconds(10);
// メッセージサブスクリプションのフィルター式。これは、任意のタグを持つすべてのメッセージがサブスクライブされることを示します。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// SimpleConsumer を初期化します。コンシューマーグループ、通信パラメーター、およびサブスクリプション関係をアタッチする必要があります。
// 順序付きメッセージを消費する場合、現在のコンシューマーグループが順序付き配信モードであることを確認してください。そうでない場合、メッセージは依然として同時に、順不同で配信されます。
SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration)
// コンシューマーグループを設定します。
.setConsumerGroup(consumerGroup)
// ロングポーリングのタイムアウト期間を設定します。
.setAwaitDuration(awaitDuration)
// 事前にアタッチされたサブスクリプション関係を設定します。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).build();
// 今回プルするメッセージの最大数を設定します。
int maxMessageNum = 16;
// メッセージの非表示期間を設定します。
Duration invisibleDuration = Duration.ofSeconds(10);
// SimpleConsumer は、クライアントが積極的にループしてメッセージを取得し、処理する必要があります。
// リアルタイム消費を改善するために、複数のスレッドを使用してメッセージを同時にプルすることをお勧めします。
while (true) {
// 同じ MessageGroup 内のメッセージの場合、先行するメッセージが消費されないと、Receive を再度呼び出しても後続のメッセージを取得できないことに注意してください。
final List<MessageView> messageViewList = consumer.receive(maxMessageNum, invisibleDuration);
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// 消費が完了したら、積極的に ACK を呼び出して消費結果を送信する必要があります。
try {
consumer.ack(messageView);
} catch (ClientException e) {
// システムの調整やその他の理由でプルが失敗した場合は、リクエストを再開してメッセージを取得する必要があります。
e.printStackTrace();
}
});
}
// SimpleConsumer が不要になった場合は、プロセスをシャットダウンできます。
// consumer.close();
}
} 順序付きメッセージの消費リトライログの取得 PushConsumer による順序付き消費のリトライは、コンシューマークライアントで発生します。サーバーは、消費リトライの詳細なログを取得できません。メッセージトレース内の順序付きメッセージの配信結果が「失敗」の場合、コンシューマークライアントのログで最大リトライ回数やコンシューマークライアントなどの情報を確認できます。
コンシューマークライアントのログパスについては、「ログ設定 」をご参照ください。
クライアントログで次のキーワードを検索すると、消費の失敗に関連するコンテンツをすばやく見つけることができます:
Message listener raised an exception while consuming messages
Failed to consume fifo message finally, run out of attempt timesベストプラクティス 順不同の処理を防ぐためのシリアル消費の使用 メッセージはバッチではなくシリアルで消費することをお勧めします。バッチ消費は、メッセージが順不同で処理される原因となる可能性があります。
たとえば、メッセージが 1、2、3、4 の順序で送信されるとします。バッチ消費中、消費シーケンスは 1、次に失敗する [2, 3] のバッチ、次に [2, 3] のリトライ、最後に 4 となる可能性があります。この場合、メッセージ 3 の失敗によりメッセージ 2 が繰り返し処理され、順不同の消費につながる可能性があります。
ホットスポットを回避するためのメッセージグループの分散 ApsaraMQ for RocketMQ は、同じメッセージグループ内のメッセージが同じキューに保存されることを保証します。異なるビジネスシナリオからのメッセージが少数のメッセージグループ、あるいは単一のメッセージグループに集中すると、ストレージの負荷がサーバー上の少数のキュー、あるいは単一のキューに集中します。これにより、パフォーマンスのホットスポットが作成され、スケーラビリティが妨げられる可能性があります。一般的な設計プラクティスは、注文 ID またはユーザー ID を順序付けの基礎として使用することです。これにより、同じエンドユーザーのメッセージが順序通りに処理されることが保証され、異なるユーザーのメッセージは順序通りに処理する必要がなくなります。
したがって、メッセージグループを細かい粒度で定義することをお勧めします。たとえば、注文 ID またはユーザー ID をメッセージグループキーとして使用できます。これにより、同じエンドユーザーに関連するメッセージが順序通りに処理されることが保証され、異なるユーザーのメッセージを並行して処理できます。