ApsaraMQ for RocketMQ は、Push コンシューマーと Simple コンシューマーの 2 種類のコンシューマーをサポートしています。このトピックでは、これらの使用方法、実装原則、信頼性、リトライメカニズム、および適用可能なシナリオについて説明します。
背景情報
ApsaraMQ for RocketMQ は、さまざまなビジネスシナリオに対応するために、異なるコンシューマータイプを提供しています。各コンシューマータイプには、異なる統合方法と制御方法があります。以下の質問に答えることで、ビジネスシナリオに最も適したコンシューマータイプを選択できます。
同時消費: コンシューマーがマルチスレッドメカニズムを使用してメッセージを処理し、処理効率を向上させるにはどうすればよいですか?
同期的および非同期的なメッセージ処理: 統合シナリオによっては、コンシューマーが受信したメッセージをビジネスロジックに非同期的に配信して処理する必要がある場合があります。非同期的なメッセージ処理はどのように実装されますか?
信頼性の高いメッセージ処理: コンシューマーはメッセージを処理した後にどのように応答を返しますか?例外が発生した場合、信頼性の高いメッセージ処理を保証するためにリトライはどのように処理されますか?
これらの質問への回答については、Push コンシューマー と Simple コンシューマー のセクションをご参照ください。
機能概要

上の図に示すように、ApsaraMQ for RocketMQ のコンシューマーがメッセージを処理するとき、メッセージの受信、メッセージの処理、および消費ステータスのコミットという段階を経ます。
これらの段階のために、ApsaraMQ for RocketMQ は Push と Simple の 2 つのコンシューマータイプを提供します。これら 2 つのコンシューマータイプは、さまざまなビジネスシナリオでの消費ニーズを満たすために、異なる実装方法とインターフェイスを使用します。次の表に、これらの違いを示します。
ビジネスシナリオが変更された場合、または使用しているコンシューマータイプがビジネスに適さなくなった場合は、コンシューマータイプを変更できます。コンシューマータイプを変更しても、既存の ApsaraMQ for RocketMQ リソースやビジネス処理には影響しません。
機能 | Push コンシューマー | Simple コンシューマー |
インターフェイスメソッド | リスナーのコールバックを介して消費結果を返します。消費ロジックはリスナー内で処理する必要があります。 | アプリケーションがメッセージ処理を実装し、インターフェイスを呼び出して消費結果を返します。 |
同時実行管理 | SDK が同時実行を管理します。 | アプリケーションロジックが消費スレッドを管理します。 |
柔軟性 | 高度にカプセル化されており、柔軟性は低いです。 | アトミックで、高度にカスタマイズ可能です。 |
シナリオ | カスタムワークフローを必要としない開発シナリオに最適です。 | 高度にカスタマイズされたビジネスワークフローを必要とする開発シナリオに最適です。 |
対応する SDK クラス | PushConsumer, LitePushConsumer | SimpleConsumer |
Push コンシューマー
Push コンシューマーは、高度にカプセル化されたコンシューマーです。メッセージリスナーを介してメッセージを処理し、消費結果を返すだけで済みます。ApsaraMQ for RocketMQ クライアントソフトウェア開発キット (SDK) は、メッセージの取得、消費ステータスの送信、および消費リトライを処理します。
使用方法
Push コンシューマーの使用は簡単です。コンシューマーの初期化中にメッセージリスナーを登録し、リスナー内にメッセージ処理ロジックを実装します。ApsaraMQ for RocketMQ SDK は、バックグラウンドでメッセージの取得、リスナーの呼び出し、リトライ処理を処理します。サンプルコードについては、「対応する SDK クラス」をご参照ください。
Push コンシューマーのメッセージリスナーは、次の 3 つの結果のいずれかを返します。
消費成功: たとえば、Java 用の SDK を使用する場合、
ConsumeResult.SUCCESSが返されます。これは、メッセージが正常に処理されたことを示します。サーバーは、この結果に基づいて消費の進行状況を更新します。消費失敗: たとえば、Java 用の SDK を使用する場合、
ConsumeResult.FAILUREが返されます。これは、メッセージの処理に失敗したことを示します。システムは、リトライポリシーに基づいて消費をリトライするかどうかを決定します。予期しない失敗: 例外がスローされた場合、結果は消費失敗として扱われます。システムは、リトライポリシーに基づいて消費をリトライするかどうかを決定します。
Push コンシューマーがメッセージを消費するときに、処理ロジックで予期しないブロックが発生してメッセージを処理できない場合、SDK はタイムアウトします。その後、強制的に消費失敗の結果を送信し、リトライポリシーに基づいてメッセージを処理します。メッセージのタイムアウトに関する詳細については、「PushConsumer のリトライポリシー」をご参照ください。
消費タイムアウトが発生すると、SDK は消費失敗の結果を送信します。ただし、現在の消費スレッドは中断に応答せず、メッセージの処理を続行する場合があります。
内部メカニズム
Push コンシューマーの場合、リアルタイムのメッセージ処理は SDK 内の典型的な Reactor スレッドモデルに基づいています。次の図に示すように、SDK には組み込みのロングポーリングスレッドがあります。このスレッドは、SDK の内部キャッシュキューに非同期でメッセージをプルします。その後、メッセージはコンシューマースレッドに送信され、リスナーがトリガーされてローカルの消費ロジックが実行されます。

信頼性とリトライ
Push コンシューマーの場合、クライアント SDK と消費ロジックの間の唯一の境界は、メッセージリスナーインターフェイスです。クライアント SDK は、リスナーから返された結果に基づいてメッセージが正常に消費されたかどうかを厳密に判断し、信頼性を確保するためにリトライを実行します。すべてのメッセージは同期的に処理する必要があります。リスナーインターフェイスが終了したときに呼び出し結果を返す必要があります。非同期配信は許可されていません。メッセージのリトライに関する詳細については、「PushConsumer のリトライポリシー」をご参照ください。
PushConsumer を使用してメッセージを消費する場合、次の方法でメッセージを処理しないでください。そうしないと、ApsaraMQ for RocketMQ はメッセージの信頼性を保証できません。
誤った方法 1: メッセージ処理が完了する前に成功結果を返す。後でメッセージ処理が失敗した場合、ApsaraMQ for RocketMQ サーバーはその失敗を認識せず、消費をリトライしません。
誤った方法 2: メッセージリスナー内でメッセージを他のカスタムスレッドに再配信し、消費結果を早期に返す。後でメッセージ処理が失敗した場合、ApsaraMQ for RocketMQ サーバーもその失敗を認識せず、消費をリトライしません。
順序の保証
ApsaraMQ for RocketMQ の 順序メッセージ の定義に基づき、コンシューマーグループが順序付き消費モードに設定されている場合、Push コンシューマーはメッセージリスナーを呼び出す際にメッセージの順序に厳密に従います。ビジネスロジックを変更することなく、消費順序が保証されます。
順序メッセージの処理には同期的な送信が必要です。ビジネスロジックがカスタムの非同期配信を実装している場合、ApsaraMQ for RocketMQ はメッセージの順序を保証できません。
シナリオ
Push コンシューマーは、同期的なメッセージ処理と各メッセージの処理タイムアウトを厳密に強制します。これらは、次のシナリオに適しています。
予測可能なメッセージ処理時間: 処理時間が不確実で、メッセージの処理に予想よりも時間がかかることが多い場合、Push コンシューマーの信頼性保証により頻繁にリトライがトリガーされます。これにより、多くの重複メッセージが発生する可能性があります。
非同期処理や高度なカスタマイズがない: Push コンシューマーは、消費ロジックのスレッドモデルを制限します。クライアント SDK は、内部で最大スループットでメッセージ処理をトリガーします。このモデルは開発を簡素化しますが、非同期処理やカスタムワークフローは許可されません。
対応する SDK クラス
ApsaraMQ for RocketMQ は、Push コンシューマー用に PushConsumer と LitePushConsumer の 2 つの SDK クラスを提供します。
PushConsumer: Lite タイプではない Topic からのメッセージの消費に適しています。
LitePushConsumer: Lite タイプの Topic からのメッセージの消費専用です。Lite Topic の粒度で消費を制御できます。
PushConsumer
サンプルコード:
// サンプル消費: PushConsumer を使用して通常のメッセージを消費します。
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "Your Topic";
FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
// コンシューマーグループを設定します。
.setConsumerGroup("Your ConsumerGroup")
// エンドポイントを設定します。
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
// 事前にバインドされたサブスクリプション関係を設定します。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// メッセージリスナーを設定します。
.setMessageListener(new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
// メッセージを消費し、処理結果を返します。
return ConsumeResult.SUCCESS;
}
})
.build();LitePushConsumer
サンプルコード:
// サンプル消費: LitePushConsumer を使用して通常のメッセージを消費します。
ClientServiceProvider provider = ClientServiceProvider.loadService();
LitePushConsumer litePushConsumer = provider.newLitePushConsumerBuilder()
// エンドポイントを設定します。
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
// Topic を設定します。
.bindTopic("Your Topic")
// コンシューマーグループを設定します。
.setConsumerGroup("Your ConsumerGroup")
// メッセージリスナーを設定します。
.setMessageListener(messageView -> {
// メッセージを消費し、処理結果を返します。
return ConsumeResult.SUCCESS;
})
.build();
// ターゲットの Lite Topic をサブスクライブします。
litePushConsumer.subscribeLite("Your Lite Topic 1");
litePushConsumer.subscribeLite("Your Lite Topic 2");Simple コンシューマー
Simple コンシューマーは、メッセージ処理のためのアトミックな操作をサポートするコンシューマーの一種です。ビジネスロジックは、操作を呼び出してメッセージを取得し、消費ステータスをコミットし、消費をリトライします。
使用方法
Simple コンシューマーの使用には、ビジネスロジックから複数の API 操作を呼び出すことが含まれます。操作を呼び出してメッセージを取得し、処理のためにビジネススレッドに配信します。処理後、コミット操作を呼び出して結果をサーバーに返します。サンプルコードについては、「対応する SDK クラス」をご参照ください。
信頼性のためのリトライ
Simple コンシューマーの場合、クライアントソフトウェア開発キット (SDK) とサーバーは ReceiveMessage および AckMessage 操作を使用して通信します。クライアント SDK がメッセージを正常に処理した場合、AckMessage 操作を呼び出します。処理が失敗した場合は、ACK 応答を送信しないでください。これにより、指定されたメッセージの不可視期間が終了した後にリトライがトリガーされます。詳細については、「SimpleConsumer のリトライポリシー」をご参照ください。
メッセージ順序の保証
Simple コンシューマーは、ApsaraMQ for RocketMQ からの 順序メッセージ を保存されている順序で処理します。順序を維持する必要があるメッセージのグループの場合、先行するメッセージが処理されないと、後続のメッセージは取得できません。
シナリオ
SimpleConsumer は、メッセージを取得し、消費結果をコミットするためのアトミックな API 操作を提供します。この方法は、PushConsumer を使用するよりも柔軟です。SimpleConsumer は、次のシナリオに適しています。
予測不可能なメッセージ処理期間: メッセージ処理期間を推定できない場合や、メッセージの処理に時間がかかることが多い場合は、SimpleConsumer を使用します。消費中に推定処理期間を指定できます。推定がビジネスニーズに適していない場合は、API 操作を呼び出して変更できます。
高度なカスタムシナリオ: SimpleConsumer SDK には複雑なスレッドカプセル化がありません。ビジネスロジックが完全に制御できます。これにより、非同期配信やバッチ消費などの高度なシナリオを実装できます。
カスタム消費レート: SimpleConsumer を使用すると、ビジネスロジックが積極的に操作を呼び出してメッセージを取得します。これにより、メッセージを取得する頻度を調整して消費レートを制御できます。
対応する SDK クラス
ApsaraMQ for RocketMQ は、Simple コンシューマー用に 1 つの SDK クラス、SimpleConsumer を提供します。このクラスは、広範なカスタム操作を提供します。SimpleConsumer は、Lite Topic からのメッセージを消費できません。
SimpleConsumer
次のコードは一例です。
// 消費例: SimpleConsumer を使用して通常のメッセージを消費します。積極的にメッセージを取得し、処理し、結果をコミットします。
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "Your Topic";
FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
// コンシューマーグループを設定します。
.setConsumerGroup("Your ConsumerGroup")
// エンドポイントを設定します。
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
// 事前にバインドされたサブスクリプション関係を設定します。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
List<MessageView> messageViewList = null;
try {
// SimpleConsumer は、積極的にメッセージを取得して処理する必要があります。
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// 処理が完了したら、積極的に ack() を呼び出して消費結果をコミットします。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// システムのスロットリングなどの理由でプルが失敗した場合は、リクエストを再開してメッセージを取得する必要があります。
e.printStackTrace();
}SimpleConsumer は、主に次の API 操作を含みます。
インターフェイス名 | 主な特徴 | 変更可能なパラメーター |
| コンシューマーは、この操作を積極的に呼び出してサーバーからメッセージを取得します。 説明 サーバーは分散ストレージを使用しているため、サーバー上にメッセージが存在する場合でも空の結果を返すことがあります。 これを解決するには、ReceiveMessage 操作を再度呼び出すか、ReceiveMessage 呼び出しの同時実行性を高めます。 |
|
| コンシューマーがメッセージを正常に消費した後、この操作を積極的に呼び出して成功応答をサーバーに返します。 | なし |
| 消費リトライシナリオでは、コンシューマーはこの操作を呼び出してメッセージ処理期間を変更し、リトライ間隔を制御できます。 | メッセージの不可視期間: この操作を呼び出して、 |
推奨事項
Push コンシューマーの消費時間を制御する
タイムアウトによる再処理を避けるために、Push コンシューマーのメッセージ消費時間を厳密に制御してください。アプリケーションが長時間かかるメッセージを頻繁に処理する場合は、Simple コンシューマーを使用し、メッセージの不可視期間を設定してください。