すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:メッセージ等性

最終更新日:Jul 09, 2024

ApsaraMQ for RocketMQでは、コンシューマーがメッセージを受信した後、特定のビジネスを一意に識別するメッセージキーに基づいて、メッセージに対してべき等処理を実行する必要があります。 これにより、メッセージの繰り返し消費による業務処理例外を防ぐことができます。 このトピックでは、メッセージべき等の概念を紹介し、シナリオとメッセージべき等を実装する方法について説明します。

メッセージべき等とは何ですか?

消費者がメッセージを繰り返し消費する場合、繰り返し消費の結果は1回の消費の結果と同じであり、繰り返し消費はビジネスシステムに悪影響を及ぼさない。 この場合、メッセージべき等性が実装される。

例えば、消費者は、支払い引き落としメッセージに基づいて注文の支払いを引き落とす。 支払額はあります 米ドル100 不安定なネットワーク接続などのネットワークの問題により、メッセージは消費者に繰り返し配信されます。 その結果、メッセージは繰り返し消費される。 ただし、支払いは1回だけ差し引かれ、 注文に対してUSD 100が生成されます。 この例では、メッセージのべき等性はメッセージ消費プロセスで実装され、支払い控除はビジネス要件を満たします。

シナリオ

インターネットアプリケーションでは、特にネットワーク接続が不安定な場合、ApsaraMQ for RocketMQメッセージが繰り返し消費されることがあります。 メッセージの繰り返し消費がビジネスに影響を与える場合は、メッセージにべき等処理を実装できます。

メッセージは、次のシナリオで繰り返し使用できます。

  • プロデューサーは、RocketMQブローカーのmessage Queueにメッセージを繰り返し送信します。

    ApsaraMQ for RocketMQブローカーにメッセージが送信されて永続化された後に一時的なネットワーク接続が発生した場合、またはプロデューサーが故障した場合、ブローカーはプロデューサーに応答できません。 この場合、プロデューサは、ブローカがメッセージを受信しないと判断し、メッセージを再度送信する。 その結果、コンシューマは、同じ内容および異なるメッセージIDを有する2つのメッセージを受信する。

  • ApsaraMQ for RocketMQブローカーは、コンシューマーにメッセージを繰り返し配信します。

    メッセージはコンシューマに配信され、コンシューマによって処理される。 コンシューマーがApsaraMQ for RocketMQブローカーに応答を送信すると、一時的なネットワーク接続が発生します。 この場合、ApsaraMQ for RocketMQブローカーはメッセージが消費されたかどうかを知りません。 メッセージが少なくとも1回消費されることを保証するために、ブローカーは、ネットワークが回復した後に再びメッセージを配信する。 その結果、コンシューマは、同じ内容および同じメッセージIDを有する2つのメッセージを受信する。

  • メッセージは、負荷分散のためにコンシューマに繰り返し配信される。 負荷分散を引き起こす要因には、ネットワークジッタ、ブローカーの再起動、およびコンシューマアプリケーションの再起動が含まれますが、これらに限定されません。

    ApsaraMQ for RocketMQブローカーまたはクライアントを再起動またはスケールすると、負荷分散がトリガーされます。 負荷分散中、コンシューマは、繰り返されるメッセージを受信し得る。

メッセージべき等性を実装する

異なるメッセージIDを有するメッセージは、同じメッセージコンテンツを含み得る。 競合を回避するために、メッセージIDに基づくべき等処理を実装しないことを推奨します。 べき等処理を実装するには、一意のビジネス識別子を使用することを推奨します。 メッセージキーを使用して、ビジネス識別子を指定できます。

たとえば、支払いシナリオでは、注文IDをメッセージキーとして指定し、指定されたメッセージキーに基づいてべき等処理を実装できます。 次のサンプルコードは、メッセージに一意のビジネス固有キーを指定する方法の例を示しています。

Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);           

コンシューマは、メッセージを受信すると、メッセージキーに基づいてべき等処理を実行することができる。

consumer.subscribe("ons_test", "*", new MessageListener() {
    public Action consume(Message message, ConsumeContext context) {
        String key = message.getKey()
        // Perform idempotent processing based on the message key that uniquely identifies your business. 
    }
});