すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:蓄積されたメッセージを処理する方法?

最終更新日:Jul 10, 2024

症状

ApsaraMQ for RocketMQインスタンスを使用すると、メッセージ蓄積アラートが表示されます。 次に、ApsaraMQ for RocketMQコンソールにログインし、次の問題を見つけます。

  • グループ詳細 ページで、グループのリアルタイムメッセージ積み上げ数の値が予想よりも高くなっています。

  • 左側のナビゲーションウィンドウで、メッセージトレース をクリックします。 表示されるページで、クエリタスクの作成 をクリックします。 表示されるパネルで、Message ID から検索 をクリックし、パラメーターを設定します。 次に、一部のメッセージはブローカーに公開されますが、消費者には配信されません。

原因

ApsaraMQ for RocketMQでは、ブローカーにメッセージが送信された後、グループがバインドされているクライアントは、コンシューマーオフセットに基づいてブローカーから一部のメッセージのみをプルします。 ほとんどの場合、クライアントがブローカーからメッセージをプルしても、メッセージは蓄積されません。 ただし、消費時間が長い場合や消費スレッドの同時実行性が低い場合は、クライアントの消費能力が不十分です。 これは、メッセージの蓄積を引き起こす可能性がある。 消費メカニズムとメッセージ蓄積の原因については、「メッセージ蓄積とレイテンシ」をご参照ください。

解決策

メッセージが蓄積されている場合は、次の操作を実行してトラブルシューティングを行います。

  1. ApsaraMQ for RocketMQブローカーまたはクライアントにメッセージが蓄積されているかどうかを確認します。

    クライアントのons.logファイルに次の情報が含まれているかどうか

    the cached message count exceeds the threshold
    • 前述の情報が見つかった場合、クライアントのバッファキューはいっぱいであり、メッセージはクライアントに蓄積されます。 この場合、ステップ2に進みます。

    • 上記の情報が見つからない場合、メッセージはクライアントに蓄積されません。 この場合は、Alibaba Cloudテクニカルサポートにお問い合わせください。

  2. メッセージの消費時間が正常かどうかを確認します。

    • 消費時間が異常な場合は、ステップ3に進み、クライアントスタック情報を表示し、ビジネスロジックの問題をトラブルシューティングします。

    • 消費時間が正常である場合、メッセージは、低消費スレッド同時実行のために蓄積され得る。 この場合、消費スレッドの数を増やすか、ノードを追加します。

    次のいずれかの方法を使用して、消費時間を表示できます。

  3. クライアントスタック情報を表示します。 ConsumeMessageThreadという名前のスレッドのスタック情報のみを照会する必要があります。 スレッドは、メッセージ消費のロジックを含む。 スレッドのステータスを判断し、特定の問題に基づいてビジネスロジックを変更する方法については、「Java公式ドキュメント」をご参照ください。

    次のいずれかの方法を使用して、クライアントスタック情報を取得できます。

    • ApsaraMQ for RocketMQコンソールにログインします。 [グループの詳細] ページの [クライアント接続] セクションで、表示するクライアントの [操作] 列の [スタック情報の表示] をクリックします。 次に、クライアントスタック情報を表示できます。 詳細については、「コンシューマーの詳細の表示」をご参照ください。

    • Jstackツールを使用して、スタック情報を照会します。

      1. メッセージを蓄積したコンシューマインスタンスのホストIPアドレスを取得し、ホストにログインします。 詳細については、「消費者のステータスの表示」をご参照ください。

      2. 次のいずれかのコマンドを実行して、Javaクライアントで実行されているプロセスのIDを表示し、IDを書き留めます。

        ps -ef
        | grep javajps -lm 
      3. 次のコマンドを実行して、スタック情報を表示します。

        jstack -l pid > /tmp/pid.jstack
      4. 次のコマンドを実行して、ConsumeMessageThreadという名前のスレッドに関する情報を表示します。

        cat /tmp/pid.jstack | grep ConsumeMessageThread -A 10 -- color

    以下の項目は、一般的な異常スタックの例です。

    • 例1: スタックがアイドル状態であり、メッセージが蓄積されていない。

      消費スレッドに消費するメッセージがない場合、スレッドはWAITING状態にあり、クライアントのバッファキューからメッセージをプルするのを待ちます。

      Sample stack 1

    • 例2: 消費スレッドが、ロック・スティール、スリープ、または待機などの状態にある。

      消費スレッドはsleep() メソッドでブロックされます。 これは遅い消費を引き起こす。 Sample stack 2

    • 例3: データベースなどの外部ストレージシステムで操作が実行されているときに、消費スレッドが停止します。

      外部HTTP呼び出しにより、消費スレッドがブロックされます。 これは遅い消費を引き起こす。 Stack example 3

  4. 蓄積されたメッセージがビジネスに影響を及ぼし、スキップできる場合は、コンシューマオフセットをリセットできます。 このようにして、消費は最新のオフセットから始まります。 詳細については、「コンシューマオフセットのリセット」をご参照ください。