ApsaraMQ for RocketMQは、消費の再試行機能を提供します。 コンシューマーがメッセージの消費に失敗した場合、ApsaraMQ for RocketMQは消費再試行ポリシーに基づいてメッセージをコンシューマーに再配信します。 このトピックでは、ApsaraMQ for RocketMQのHTTPおよびTCP経由で送信されるメッセージの消費再試行ポリシーについて説明します。
使用上の注意
メッセージのIDは、リトライ回数に関係なく変更されません。
ApsaraMQ forRocketMQは、クラスタリング消費モードでのみメッセージを再配信します。 コンシューマがブロードキャスト消費モードでメッセージの消費に失敗した場合、ApsaraMQ forRocketMQはメッセージをコンシューマに再配信しません。 この場合、消費者は新しいメッセージを消費し続ける。
概要
メッセージの消費失敗または消費タイムアウトが発生した場合、ApsaraMQ for RocketMQは、指定された再試行間隔が経過した後にメッセージをコンシューマーに再配信します。 再試行の最大数に達してもメッセージの消費が失敗した場合、ApsaraMQ for RocketMQはメッセージをデットレターキューに配信します。 dead-letterキューの詳細については、「Dead-letterキュー」をご参照ください。 デッドレターキューのメッセージを使用して、ビジネスを復元できます。
次の項目は、消費の再試行の主な動作を示します。
リトライ間隔: メッセージの消費失敗またはタイムアウトが発生した時点から、メッセージの次の消費が開始される時点までの間隔。
最大再試行回数: メッセージの消費に失敗した後、ApsaraMQ for RocketMQがコンシューマーにメッセージを再配信できる最大回数。
TCP経由で送信されるメッセージの再試行の最大数は、HTTP経由で送信されるメッセージの再試行の最大数とは異なります。 詳細については、「TCP経由で送信されたメッセージの再試行ポリシー」および「HTTP経由で送信されたメッセージの再試行ポリシー」をご参照ください。
TCP経由で送信されたメッセージの再試行ポリシー
再試行ステータス
次の図は、コンシューマーがメッセージを消費したときにメッセージのステータスがどのように変化するかを示しています。
準備完了
メッセージは、ApsaraMQ for RocketMQブローカーで使用できるようになりました。
機内
メッセージはコンシューマによって取得されて消費されますが、消費結果は返されません。
WaitingRetry
メッセージの消費に失敗した場合、またはメッセージの消費がタイムアウトした場合、消費の再試行ロジックがトリガーされます。 リトライ回数の上限に達していない場合は, リトライ間隔が経過した後, メッセージの状態がReadyに変わります。 Ready状態にある失敗したメッセージは、再度使用できます。 頻繁な再試行を防ぐために、再試行の間隔を広げることができます。
コミット
メッセージが消費されます。 消費者が成功応答を返した後、消費は完了する。
DLQ
消費ロジックの実装を保証するために使用されるメカニズム。 無効文字メッセージを保持する機能が有効になっている場合、再試行の最大数に達した後に消費されないメッセージが無効文字トピックに送信されます。 デッドレタートピックのメッセージを使用して、ビジネスを復元できます。 詳細については、「Dead-letterキュー」をご参照ください。
上の図は、再試行プロセスのサンプルを示しています。 図では、メッセージは5秒間レディ状態のままであり、消費されるのに6秒を要する。
メッセージが再試行されるたびに、メッセージのステータスはReadyからInflightに変わり、次にWaitingRetryに変わります。 リトライ間隔とは、メッセージの消費失敗またはタイムアウトが発生した時点から、メッセージの次の消費が開始する時点までの間隔を指す。 2つの連続した消費の間の間隔は、再試行間隔、消費持続時間、およびメッセージがレディ状態のままである期間を含む。 例:
メッセージが消費のために初めて配信されると、メッセージは0秒目にレディ状態に入る。
メッセージは5秒目にプルされます。 6秒目に、消費エラーが発生し、消費失敗を示すメッセージがクライアントによって返されます。
10秒のリトライ間隔が指定されているため, すぐにはリトライできません。
21秒で、メッセージは再びレディ状態に入る。
5秒後、クライアントは再びメッセージの消費を開始します。
消費間隔は、以下の式を使用することによって21秒として計算される。消費間隔=消費持続時間 + 再試行間隔 + レディ状態における持続時間=6 + 10 + 5 = 21。
再試行間隔と再試行回数
プロトコル | メッセージタイプ | 再試行間隔 | 最大再試行回数 |
TCP | 注文メッセージ | suspendTimeMillisパラメーターは、順序付きメッセージの再試行間隔を指定するために使用されます。 有効な値: 10 ~ 30000 単位:ミリ秒。 デフォルト値は 1000 です。 デフォルト値は、失敗した順序付けられたメッセージが1秒間隔で再配信されることを示します。 | MaxReconsumeTimesパラメーターは、順序付きメッセージの最大再試行回数を指定するために使用されます。 このパラメータの値に上限は課されない。 このパラメーターを空のままにすると、再試行の最大数はInteger.MAXになります。 |
Unorderedメッセージ | 順序付けられていないメッセージの再試行間隔は、再試行の回数によって異なります。 再試行間隔は10秒から2時間です。 順序なしメッセージのカスタム再試行間隔は指定できません。
| MaxReconsumeTimesパラメーターは、順序付けられていないメッセージの最大再試行回数を指定するために使用されます。 デフォルト値: 16。 このパラメータの値に上限は課されない。 デフォルト値を使用することを推奨します。 |
表1. TCP経由で送信される順序付けられていないメッセージのリトライ間隔
番号 | インターバル | 番号 | インターバル |
1 | 10 秒 | 9 | 7分 |
2 | 30 秒 | 10 | 8分 |
3 | 1 分 | 11 | 9分 |
4 | 2分 | 12 | 10 分 |
5 | 3分 | 13 | 20分 |
6 | 4分 | 14 | 30分 |
7 | 5 分 | 15 | 1 時間 |
8 | 6分 | 16 | 2時間 |
設定方法
次の例では、TCP経由で送信されるメッセージを使用して、再試行ポリシーの設定方法を説明します。
メッセージの再試行機能の有効化
ApsaraMQ forRocketMQで、クラスタリング消費モードでの消費に失敗したメッセージを再配信する場合は、次のいずれかの方法を使用してMessageListenerメソッドを実装します。
方法1: Return Action.ReconsumeLater. この方法を使用することを推奨します。
方法2: nullを返します。
方法3: 例外を投げる。
サンプルコード
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { // If the consumption logic throws an exception, the message is retried. doConsumeMessage(message); // Method 1: Return Action.ReconsumeLater and retry the message. return Action.ReconsumeLater; // Method 2: Return null and retry the message. return null; // Method 3: Throw an exception and retry the message. throw new RuntimeException("Consumer Message exception"); } }
メッセージの再試行機能の無効化
ApsaraMQ forRocketMQが、クラスタリング消費モードでの消費に失敗したメッセージを再配信しない場合は、消費ロジックによってスローされたすべての例外をキャプチャしてAction.CommitMessageを返すようにメッセージ消費コードを設定します。 このように、ApsaraMQ forRocketMQはメッセージを再配信しません。
サンプルコード
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { try { doConsumeMessage(message); } catch (Throwable e) { // Capture all exceptions that are thrown by the consumption logic and return Action.CommitMessage. return Action.CommitMessage; } // The message is processed as expected and Action.CommitMessage is returned. return Action.CommitMessage; } }
カスタム再試行間隔と最大再試行回数の指定
説明ApsaraMQ for RocketMQクライアントのログ設定にカスタム値を指定する場合は、TCPクライアントSDK for Javaをバージョン1.2.2以降に更新します。 詳細については、「リリースノート」をご参照ください。
ApsaraMQ for RocketMQでは、コンシューマーの起動時にカスタムの再試行間隔と最大再試行回数を指定できます。 順序なしメッセージのカスタム再試行間隔は指定できません。 順序なしメッセージの再試行間隔の詳細については、「TCP経由で送信される順序なしメッセージの再試行間隔」をご参照ください。
次のサンプルコードは、カスタム間隔と最大再試行回数を指定する方法の例を示しています。
Properties properties = new Properties(); // Set the maximum number of retries allowed for messages in the specified consumer group to 20. The value is a string. properties.put(PropertyKeyConst.MaxReconsumeTimes,"20"); // Set the retry interval for messages in the specified consumer group to 3,000 milliseconds. The value is a string. properties.put(PropertyKeyConst.SuspendTimeMillis,"3000"); Consumer consumer = ONSFactory.createConsumer(properties);
重要最新の設定は、同じグループ内のコンシューマに対して有効になります。 直近の時点で開始されたコンシューマーの構成は、前の時点で開始されたコンシューマーの構成を上書きします。 コンシューマーグループ内のすべてのコンシューマーが、再試行間隔と再試行の最大回数に同じ設定を使用していることを確認します。 コンシューマーグループ内のすべてのコンシューマーが再試行間隔と最大再試行回数で同じ設定を使用しない場合、コンシューマーの設定は互いに上書きされます。
リトライ回数の照会
コンシューマーがメッセージを受信した後、次の方法を使用して再試行回数を照会できます。 ほとんどの場合、再試行間隔を照会する必要はありません。
public class MessageListenerImpl implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { // Query the number of retries. System.out.println(message.getReconsumeTimes()); return Action.CommitMessage; } }
HTTP経由で送信されるメッセージの再試行ポリシー
プロトコル | メッセージタイプ | 再試行間隔 | 最大再試行回数 | 設定 |
HTTP | 注文メッセージ | 1 分 | 288 | メッセージの再試行方法はシステムによって事前定義されており、変更できません。 |
Unorderedメッセージ | 5 分 | 288 | 設定はシステムによって事前定義されており、変更することはできません。 |