ApsaraMQ for RocketMQは、コンシューマオフセットを使用してコンシューマの進捗状況を管理します。 このトピックでは、ApsaraMQ for RocketMQのコンシューマー進捗管理メカニズムについて説明します。
背景情報
ApsaraMQ for RocketMQでは、コンシューマーが購読する前または後にメッセージを生成できます。 消費者はメッセージの消費を開始する場所をどのように知っていますか? この課題を克服するために、ApsaraMQ for RocketMQは消費者の進捗管理メカニズムを開発しました。
ApsaraMQ for RocketMQのコンシューマー進捗管理メカニズムは、次の問題を解決します。
クライアントは起動後どこでメッセージを消費し始めますか?
消費されたメッセージが複数回処理されないように、どのようにマークされますか?
サービス例外が発生した場合、同じクライアントでメッセージを再度消費できますか?
働くメカニズム
Offset
ApsaraMQ for RocketMQでは、メッセージはブローカーに到着した順序で特定のトピックの複数のキューに格納されます。 各メッセージには、メッセージのオフセットとしても知られている固有のLong型座標が割り当てられる。
理論的に言えば、メッセージキューは不定数のメッセージを格納することができる。 したがって、オフセットの値の範囲は0からLong.MAX_VALUEです。 トピック、キュー、およびオフセットに基づいてメッセージを検索できます。 これらの概念の関係を次の図に示します。
ApsaraMQ for RocketMQでは、キュー内の最も早いメッセージのオフセットは最小オフセット (MinOffset) と呼ばれ、最新のメッセージのオフセットは最大オフセット (MaxOffset) と呼ばれます。 メッセージキューは理論的には不定数のメッセージを保持することができるが、それらが格納される物理マシンは限られた空間を有する。 したがって、ApsaraMQ for RocketMQはキューから最も早く格納されたメッセージを動的に削除し、キューのMinOffset値とMaxOffset値は常に増加します。
消費者オフセット
ApsaraMQ for RocketMQは、パブリッシュ-サブスクライブのパターンに従います。 複数のコンシューマグループが同じキューにサブスクライブできます。 このようなシナリオでは、コンシューマーがメッセージを消費した後にメッセージを削除すると、他のコンシューマーはメッセージを消費できなくなります。
これを防ぐために、ApsaraMQ for RocketMQはコンシューマーオフセットを使用して、さまざまなコンシューマーのメッセージ消費の進行状況を管理します。 ApsaraMQ for RocketMQは、メッセージが消費された直後にはメッセージを削除しません。 代わりに、ApsaraMQ for RocketMQは、コンシューマーグループによって消費された最新のメッセージのレコードを保持します。これはコンシューマーオフセットとも呼ばれます。
クライアントが再起動された場合、コンシューマは、ブローカに保存されたコンシューマオフセットに基づいてメッセージを処理し続けることができる。 消費者オフセットが期限切れになって削除されると、ブローカーに保存されているキューのMinOffset値が消費者オフセットとして使用されます。
コンシューマーオフセットは、ApsaraMQ for RocketMQブローカーに保存および復元され、特定のコンシューマーとは関係ありません。 したがって、ApsaraMQ for RocketMQは、さまざまな消費者間で消費者の進捗状況を復元できます。
次の図は、メッセージキュー内の最小オフセット、最大オフセット、およびコンシューマオフセットの関係を示しています。
消費者オフセットは、常に最大オフセット以下である。
メッセージが同じレートで生成および消費され、消費されていないメッセージがキューに存在しない場合、消費者オフセットは最大オフセットと同じです。
メッセージが生成されるよりも遅く消費される場合、消費されないメッセージがキューに存在します。 この場合、消費者オフセットは最大オフセットよりも小さく、その差は消費されていないメッセージの数である。
典型的には、消費者オフセットは最小オフセット以上である。 消費者オフセットが最小オフセットより小さい場合、消費者はメッセージを消費することができない。 この場合、ブローカーは、消費者の正しい消費者オフセットを復元する。
初期消費者オフセット
初期消費者オフセットは、消費者グループが初めてメッセージキューを消費し始めるときにブローカーに保存される消費者オフセットである。
ApsaraMQ for RocketMQは、コンシューマがキューからメッセージを初めて取得したときのメッセージキューの最大オフセットを初期コンシューマオフセットとして使用します。 すなわち、コンシューマは、キュー内の最新のメッセージから消費を開始する。
コンシューマオフセットのリセット
初期または現在の消費者オフセットがビジネスの状態と一致しない場合は、消費者オフセットをリセットして消費者の進捗状況を調整できます。
シナリオ
不適切な初期消費者オフセット: 初期消費者オフセットはキューの最大オフセットであり、クライアントは最新のメッセージから消費を開始します。 以前のメッセージを消費する必要がある場合は、消費者オフセットを以前のメッセージのオフセットにリセットできます。
コンシューマーラグ: コンシューマーがメッセージが生成される速度に追いつくことができない場合、多数のメッセージが蓄積される可能性があります。 蓄積されたメッセージがミッションクリティカルでない場合は、コンシューマーオフセットをより大きな値に設定して、これらのメッセージをスキップし、ダウンストリームの負担を軽減できます。
ビジネスのバックトラックと修正処理: ビジネスエラーのために誤って消費されたメッセージを再消費する場合は、消費者オフセットを小さい値に設定できます。
コンシューマオフセットリセット機能
ApsaraMQ for RocketMQのコンシューマオフセットリセット機能は、次の機能を提供します。
コンシューマオフセットを最新オフセットにリセット
指定されたコンシューマーグループのコンシューマーは、指定されたトピックに蓄積されているすべてのメッセージをスキップし、最新のオフセットから消費を開始します。
コンシューマオフセットを特定の時点にリセットする
コンシューマは、メッセージが消費されたかどうかに関係なく、リセット時点に対応するメッセージから消費を開始します。
最初のメッセージがトピックに送信された時点から、最新のメッセージがトピックに送信された時点までの時間範囲内の時点を指定できます。
コンシューマオフセットを特定の時点にリセットすると、ブローカーはその時点に最も近いオフセットにコンシューマオフセットを調整します。
設定方法
コンソール操作
ApsaraMQ for RocketMQコンソールにログインします。 左側のナビゲーションウィンドウで、インスタンスリスト を選択します。
インスタンスリスト ページで、管理するインスタンスを選択します。 表示される インスタンスの詳細 ページの左側のナビゲーションウィンドウで、グループ管理 をクリックします。
[グループ] ページで、管理するグループをクリックします。 表示される グループ詳細 ページで、コンシューマオフセットをリセットします。
API操作: ResetConsumeOffset
制限事項
コンシューマーオフセットをリセットすると、コンシューマーは新しいオフセットからメッセージの消費を開始します。 バックトラッキングシナリオでは、消費者は、ほとんどがコールドデータである履歴メッセージから始めます。 これはコールドリードと呼ばれ、システムに過度の負担をかける可能性があります。 消費者オフセットをリセットする前に、リスクとメリットを評価します。 不正使用や頻繁なリセットを防ぐために、この権限に厳密な制御ポリシーを実装することを推奨します。
ApsaraMQ for RocketMQでは、表示されているメッセージに対してのみコンシューマオフセットをリセットできます。 スケジューリングまたは再試行の保留状態にあるメッセージのオフセットをリセットすることはできません。 詳細については、「スケジュール済みメッセージと遅延済みメッセージ」および「消費の再試行」をご参照ください。
バージョンの互換性
ApsaraMQ for RocketMQのバージョンによって、ブローカーの初期消費者オフセットの定義が異なります。
4.xおよび3.xバージョンでは、初期コンシューマーオフセットはキューのメッセージステータスとして定義されます。
5.xバージョンでは、初期コンシューマーオフセットは、コンシューマーがメッセージの受信を開始した時点でのキューの最大オフセットです。
したがって、以前のバージョンからアップグレードする場合は、クライアントの起動時に最初の消費者オフセットに注意する必要があります。
使用上の注意
リセット権限を厳密に制御する
コンシューマオフセットをリセットすると、システムに追加の負担がかかり、メッセージの読み書きに影響を与える可能性があります。 したがって、この操作を実行する前にリスクとメリットを評価することをお勧めします。