ApsaraMQ for RocketMQは、PushConsumerとSimpleConsumerのタイプのコンシューマーをサポートしています。 このトピックでは、2つのコンシューマタイプの使用方法、作業および再試行のメカニズム、およびシナリオについて説明します。
背景情報
ApsaraMQ for RocketMQは、PushConsumerおよびSimpleConsumerコンシューマタイプを提供します。 2つのコンシューマータイプには、異なるビジネスシナリオでメッセージング要件を満たすために使用できる異なる統合および制御方法があります。 ビジネスシナリオに適したコンシューマタイプを選択するには、次の要因が役立ちます。
同時消費: コンシューマーはマルチスレッド手法を使用して、メッセージ処理効率を高めるために同時メッセージ消費を実装する方法を教えてください。
同期または非同期メッセージ処理: 異なる統合シナリオでは、コンシューマは、受信したメッセージを処理のためにビジネスロジックシステムに非同期に配信する必要がある場合があります。 非同期メッセージ処理を実装する方法?
信頼できるメッセージ処理: 消費者はメッセージを処理するときに応答結果をどのように返しますか? 信頼できるメッセージ処理を確保するために、メッセージエラーが発生したときにメッセージの再試行を実装する方法?
上記の問題に対する回答については、「PushConsumer」および「SimpleConsumer」をご参照ください。
機能の概要
上の図は、ApsaraMQ for RocketMQでのコンシューマーによるメッセージ消費には、メッセージの受信、メッセージの処理、および消費ステータスのコミットの段階が含まれることを示しています。
ApsaraMQ for RocketMQは、PushConsumerとSimpleConsumerのコンシューマータイプを提供します。 2つのタイプのコンシューマは、異なる実装方法とAPI操作を提供することにより、さまざまなメッセージ消費シナリオに適しています。 次の表に、2つのタイプのコンシューマーの違いを示します。
ビジネスシナリオに基づいて、2つのコンシューマータイプを切り替えることができます。 別のコンシューマタイプに切り替えても、ApsaraMQ for RocketMQの既存のリソースおよび既存のビジネス処理タスクの使用は影響を受けません。
項目 | PushConsumer | SimpleConsumer |
API操作呼び出し | コールバック操作は、メッセージリスナーを使用して消費結果を返すために呼び出されます。 コンシューマーは、メッセージリスナーのスコープ内でのみ消費ロジックを処理できます。 | ビジネスアプリケーションはメッセージ処理を実装し、対応する操作を呼び出して消費結果を返します。 |
消費並行性管理 | ApsaraMQ forRocketMQ SDKは、メッセージ消費の同時スレッド数を管理するために使用されます。 | メッセージ消費に使用される同時スレッドの数は、個々のビジネスアプリケーションの消費ロジックに基づいています。 |
APIの柔軟性 | API操作はカプセル化され、柔軟性に乏しい。 | アトミック操作は大きな柔軟性を提供します。 |
シナリオ | このコンシューマタイプは、カスタムプロセスを必要としない開発シナリオに適しています。 | このコンシューマタイプは、カスタムプロセスが必要な開発シナリオに適しています。 |
PushConsumer
PushConsumerは、高度なカプセル化を提供するコンシューマータイプです。 メッセージの消費と消費結果の送信は、メッセージリスナーのみを使用して処理されます。 メッセージの取得、消費ステータスの送信、および消費の再試行は、ApsaraMQ for RocketMQクライアントSDKを使用して完了します。
使用状況
PushConsumerは固定的に使用されます。 メッセージリスナーは、コンシューマーが初期化されるときにPushConsumerコンシューマーに登録され、メッセージ処理ロジックがメッセージリスナーに実装されます。 メッセージの取得、リスナー呼び出しのトリガー、およびメッセージの再試行は、ApsaraMQ for RocketMQ SDKを使用して処理されます。
サンプルコード:
// Message consumption example: Use a PushConsumer consumer to consume messages.
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "Your Topic";
FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
// Configure consumer groups.
.setConsumerGroup("Your ConsumerGroup")
// Specify the access point.
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
// Specify the pre-bound subscriptions.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// Set the message listener.
.setMessageListener(new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
// Consume the messages and return the consumption result.
return ConsumeResult.SUCCESS;
}
})
.build();
PushConsumerコンシューマーのメッセージリスナーは、次のいずれかの結果を返します。
たとえば、ApsaraMQ forRocketMQ SDK For Javaを使用してメッセージが消費された場合、
ConsumeResult. success
が返されます。 サーバは、消費結果に基づいて消費進捗を更新する。たとえば、ApsaraMQ forRocketMQ SDK For Javaを使用し、メッセージの使用に失敗した場合、
ConsumeResult. failure
が返されます。 ApsaraMQ forRocketMQがメッセージの消費を再試行するかどうかは、消費の再試行ロジックによって異なります。予期しない失敗: たとえば、予期しない例外がスローされた場合、メッセージは使用されません。 ApsaraMQ forRocketMQがメッセージの消費を再試行するかどうかは、消費の再試行ロジックによって異なります。
メッセージ処理ロジックの予期しないエラーが常にPushConsumerコンシューマーによるメッセージの消費を妨げている場合、SDKは消費がタイムアウトしたとみなし、消費失敗の結果を強制的にコミットします。 次に、メッセージは消費再試行ロジックに基づいて処理されます。 消費タイムアウトの詳細については、「プッシュモードで消費されるメッセージの再試行ポリシー」をご参照ください。
消費タイムアウトが発生すると、SDKは消費失敗の結果をコミットします。 しかしながら、電流消費スレッドは、結果に応答することができず、メッセージを処理し続けることがある。
働くメカニズム
PushConsumerの場合、リアルタイムメッセージ処理はSDKの一般的なReactorスレッドモデルに基づいています。 SDKには、メッセージをプルしてキューに保存する長いポーリングスレッドが組み込まれています。 次に、メッセージはキューから個々のメッセージ消費スレッドに配信されます。 メッセージリスナーは、メッセージ消費ロジックに基づいて動作します。 次の図は、PushConsumerコンシューマーのメッセージ消費プロセスを示しています。
信頼性の再試行
PushConsumerの場合、クライアントSDKと消費ロジックユニット間の通信は、メッセージリスナーのみを使用して実装されます。 クライアントSDKは、メッセージリスナーによって返された結果に基づいてメッセージが消費されたかどうかをチェックし、メッセージの信頼性を確保するために消費再試行ロジックに基づいて再試行を実行します。 すべてのメッセージは同期的に消費されなければならない。 リスナー操作の呼び出しが終了すると、消費結果が返されます。 非同期配布は許可されていません。 メッセージの再試行の詳細については、「プッシュモードで使用されるメッセージの再試行ポリシー」をご参照ください。
メッセージングの信頼性を確保するため、ApsaraMQ for RocketMQは、PushConsumerコンシューマーによるメッセージ消費における以下の動作を禁止しています。
メッセージの消費が完了する前に、消費結果を返します。 例えば、後で消費に失敗したメッセージについては、事前に消費成功結果を返す。 この場合、ApsaraMQ for RocketMQは実際の消費結果を確認できず、メッセージの消費を再試行しません。
メッセージリスナーから他のカスタムスレッドにメッセージを配布し、事前に消費結果を返します。 メッセージの消費に失敗したが、事前に消費成功の結果が返された場合、ApsaraMQ for RocketMQは実際の消費結果を確認できず、メッセージの消費を再試行しません。
確実なメッセージの順序
ApsaraMQ For RocketMQの注文メッセージの場合、注文メッセージの消費がコンシューマーグループに設定されている場合、PushConsumerコンシューマーは消費順序でメッセージを消費します。 PushConsumerの消費者がメッセージを消費すると、個々のビジネスアプリケーションがビジネスロジックで消費順序を定義する必要なく、消費順序が保証されます。
ApsaraMQ forRocketMQでは、同期コミットが順序付きメッセージ処理の前提条件です。 ビジネスロジックで非同期配布が定義されている場合、ApsaraMQ for RocketMQはメッセージの順序を保証できません。
シナリオ
PushConsumerは、メッセージ処理を同期処理に制限し、各メッセージ処理のタイムアウトを制限します。 PushConsumerは、次のシナリオに適しています。
予測可能なメッセージ処理時間: メッセージ処理時間が制限されていない場合、メッセージの信頼性を確保するために長い処理時間を必要とするメッセージに対してメッセージの再試行が継続的にトリガーされます。 これにより、多数のメッセージが繰り返される。
非同期およびカスタムプロセスなし: PushConsumerは、消費ロジックのスレッドモデルをReactorスレッドモデルに制限します。 クライアントSDKは、最大スループットに基づいてメッセージを処理します。 このモデルは開発が簡単ですが、非同期またはカスタムプロセスは許可されていません。
SimpleConsumer
SimpleConsumerは、メッセージ処理のアトミック操作をサポートするコンシューマタイプです。 このようなタイプのコンシューマは、メッセージを取得し、消費ステータスをコミットし、ビジネスロジックに基づいてメッセージの再試行を実行する操作を呼び出します。
使用状況
SimpleConsumerには複数のAPI操作が含まれます。 対応する操作は、必要に応じて呼び出され、メッセージを取得して処理のためにビジネススレッドに配信します。 次に、コミット動作が呼び出されて、メッセージ処理結果をコミットする。 サンプルコード:
// Consumption example: When a SimpleConsumer consumer consumes normal messages, the consumer obtains messages and commits message consumption results.
ClientServiceProvider provider = ClientServiceProvider.loadService();
String topic = "Your Topic";
FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
// Configure consumer groups.
.setConsumerGroup("Your ConsumerGroup")
// Specify the access point.
.setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
// Specify the pre-bound subscriptions.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
List<MessageView> messageViewList = null;
try {
// A SimpleConsumer consumer must obtain and process messages.
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
// After consumption is complete, the consumer must invoke ACK to submit the consumption result.
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
// If the pull fails due to system traffic throttling or other reasons, the consumer must re-initiate the request to obtain the message.
e.printStackTrace();
}
次の表に、SimpleConsumerに提供されるAPI操作を示します。
API 操作 | 説明 | 変更可能なパラメーター |
| 消費者はこの操作を呼び出して、サーバーからメッセージを取得できます。 説明 サーバーは分散ストレージを使用するため、要求されたメッセージが実際にサーバー上に存在するにもかかわらず、サーバーは空の結果を返す可能性があります。 この問題を解決するには、ReceiveMessage操作を再度呼び出すか、ReceiveMessage操作の同時実行値を増やすことができます。 |
|
| コンシューマーがメッセージを消費した後、コンシューマーはこの操作を呼び出して、消費成功結果をサーバーに返します。 | なし |
| 消費再試行シナリオでは、コンシューマーはこの操作を呼び出してメッセージ処理時間を変更し、メッセージの再試行間隔を制御できます。 | メッセージ不可視期間: メッセージの最大処理時間。 この操作を呼び出して、 |
信頼性の再試行
SimpleConsumerがメッセージを消費すると、クライアントSDKとApsaraMQ forRocketMQサーバー間の通信は、ReceiveMessage
およびAckMessage
操作を使用して実装されます。 クライアントSDKがメッセージを正常に処理すると、AckMessage
操作が呼び出されます。 メッセージの処理に失敗した場合、指定されたメッセージの不可視期間が経過した後、メッセージ再試行メカニズムをトリガーするためのackメッセージは返されません。 詳細については、「シンプルモードで使用されるメッセージの再試行ポリシー」をご参照ください。
確実なメッセージの順序
ApsaraMQ for RocketMQでは、SimpleConsumerコンシューマは、順序付けられたメッセージを格納された順序で取得します。 順序付けられたメッセージの詳細については、「順序付けられたメッセージ」をご参照ください。 順序メッセージのセット内のメッセージが完全に処理されない場合、順序メッセージのセット内の次のメッセージを取得することはできません。
シナリオ
SimpleConsumerは、メッセージを取得して消費結果をコミットするためのアトミックAPI操作を提供します。 PushConsumerと比較して、SimpleConsumerはより優れた柔軟性を提供します。 SimpleConsumerは、次のシナリオに適しています。
制御できないメッセージ処理時間: メッセージ処理時間が推定できない場合、SimpleConsumerを使用して、メッセージが過度に長い期間処理されないようにすることを推奨します。 メッセージ消費中の推定メッセージ処理時間を指定できます。 既存の処理時間がビジネスシナリオに適していない場合は、対応するAPI操作を呼び出してメッセージ処理時間を変更できます。
非同期処理とバッチ消費: SimpleConsumerは、SDKに複雑なスレッドカプセル化を含まない。 ビジネスアプリケーションはカスタム設定を使用できます。 このように、SimpleConsumerコンシューマーは、非同期配布、バッチ消費、およびその他のカスタムシナリオを実装できます。
カスタムメッセージ消費率: SimpleConsumerを使用すると、ビジネスアプリケーションはReceiveMessage操作を呼び出してメッセージを取得します。 メッセージの取得頻度を調整して、メッセージの消費率を制御できます。
使用上の注意
PushConsumerの適切な消費期間制限を指定します。
メッセージが長時間処理されないように、PushConsumerのメッセージ消費期間を制限することをお勧めします。 メッセージを長時間処理すると、メッセージ処理のタイムアウトによりメッセージが重複し、次のメッセージが継続的に消費されるのを待つことがあります。 メッセージが過度に長い期間にわたって頻繁に処理される場合は、SimpleConsumerを使用し、ビジネス要件に基づいて適切なメッセージの不可視期間を指定することを推奨します。