ApsaraMQ for RocketMQは、バッチ消費機能を提供します。 この機能を使用して、メッセージを効率的に処理したり、ダウンストリームリソースによるAPI呼び出しの数を減らしたりできます。 このトピックでは、バッチ消費機能の定義、利点、シナリオ、制限、およびサンプルコードについて説明します。
バッチ消費とは何ですか?
- 定義
ApsaraMQ for RocketMQは、バッチ消費機能を提供します。 この機能により、プッシュコンシューマはコンシューマスレッドにメッセージをバッチで送信できます。 次に、コンシューマースレッドはメッセージをバッチで消費します。
説明 ApsaraMQ for RocketMQは、コンシューマがプッシュモードでメッセージを取得するかプルモードでメッセージを取得するかに基づいて、プッシュコンシューマとプルコンシューマを提供します。 詳細については、「Terms」をご参照ください。 - 制御ポリシー機能の動作バッチ消費は、次の2つの段階に分けられます。
- プロデューサーはApsaraMQ for RocketMQにメッセージを公開します。 次に、プッシュコンシューマのメッセージプルスレッドは、ロングポーリングポリシーを使用してメッセージをプルし、ApsaraMQ for RocketMQのバックエンドにキャッシュします。
- プッシュコンシューマは、キャッシュされたメッセージがバッチ消費のために指定された条件の1つを満たすかどうかに基づいて、バッチ消費のためにコンシューマースレッドにメッセージを送信するかどうかを決定します。
次の図は、バッチ消費のプロセスを示しています。
制限事項
- バッチ消費はTCPでのみサポートされます。 バージョンが1.8.7.3.Final以降のTCPクライアントSDK for Javaの商用版を使用していることを確認してください。 SDKのリリースノートとSDKの入手方法については、「リリースノート」をご参照ください。
- 1回のバッチで最大1,024件のメッセージを送信できます。 バッチ間の最大待機時間は450秒です。
メリットとシナリオ
次の情報は、バッチ消費機能の利点とシナリオを示しています。
- 利点1: メッセージのスループットと処理効率の向上
シナリオ: ApsaraMQ for RocketMQは、上流の注文システムを下流のElasticsearchシステムから切り離します。 Elasticsearchシステムは、上流の注文システムから10個のログメッセージを消費します。 Elasticsearchシステムの場合、各メッセージはリモートプロシージャコール (RPC) リクエストに相当します。 1つのRPC要求が10ミリ秒を消費すると仮定する。 バッチ消費機能を使用しない場合、10個のログメッセージを消費するには合計100ミリ秒が必要です。 バッチ消費機能を使用すると、10個のメッセージが1つのバッチで消費され、消費時間は10ミリ秒に短縮されます。 このようにして、メッセージを効率的に処理できる。
- 利点2: ダウンストリームリソースによるAPI呼び出しの数を減らす
シナリオ: データベースにデータを挿入するとします。 新しいデータごとに挿入ジョブを実行し、データを頻繁に更新する必要がある場合、データベースに大きな負荷がかかる可能性があります。 関連パラメーターを設定して、1つのバッチに10個のデータを挿入し、5秒ごとに挿入操作を実行してシステムの負荷を軽減できます。
サンプルコード
次のサンプルコードは、バッチ消費の例を示しています。
com.aliyun.openservices.ons.api.Actionをインポートします。com.aliyun.openservices.ons.api.ConsumeContextをインポートします。com.aliyun.openservices.ons.api.Messageをインポートします。com.aliyun.openservices.ons.api.batch.BatchConsumerをインポートします。com.aliyun.openservices.ons.api.batch.BatchMessageListenerをインポートします。java.util.Listをインポートします。java.util.Propertiesをインポートします。com.aliyun.openservices.ons.api.ONSFactoryをインポートします。com.aliyun.openservices.ons.api.PropertyKeyConstをインポートします。com.aliyun.openservices.tcp.example.MqConfigをインポートします。public class SimpleBatchConsumer {
public static void main(String[] args) {
Properties consumerProperties = new Properties();
// ApsaraMQ for RocketMQコンソールで作成したコンシューマグループのID。
consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MqConfig.GROUP_ID);
// 環境変数ALIBABA_CLOUD_ACCESS_KEY_IDとALIBABA_CLOUD_ACCESS_KEY_SECRETが設定されていることを確認します。
// 認証に使用されるAccessKey ID。
properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
// 認証に使用されるAccessKeyシークレット。
properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// TCPエンドポイント。 ApsaraMQ for RocketMQコンソールの [インスタンスの詳細] ページの [TCPエンドポイントセクション] でエンドポイントを取得できます。
consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MqConfig.NAMESRV_ADDR);
// 一度に消費されるメッセージの最大数。 この例では、値は128として指定されます。 指定されたトピックでキャッシュされたメッセージの数がこの値に達すると、SDKはすぐにコンシューマーがメッセージを消費するためのコールバックメソッドを呼び出します。 有効な値: 1 ~ 1024 デフォルト値: 32。
consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
// 2つの連続したバッチ間の最大待機時間。 この例では、値は10秒として指定されています。 指定された待機時間に達すると、SDKはすぐにコンシューマーがメッセージを消費するためのコールバックメソッドを呼び出します。 有効な値: 0 ~ 450 デフォルト値:0 単位は秒です。
consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));
BatchConsumer batchConsumer=ONSCatchFactory. createBatchConsumer(consumerProperties);
batchConsumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new BatchMessageListener() {
@Override
public Action consume (最終的なリスト <メッセージ> メッセージ、ConsumeContextコンテキスト) {
System.out.printf("Batch-size: % d\n", messages.size());
// メッセージをバッチで処理します。
return Action.CommitMessage;
}
});
// バッチ消費のためにコンシューマーを起動します。
batchConsumer.start();
System.out.println("Consumer start success.");
// プロセスが終了しないように、特定の期間待ちます。
try {
Thread.sleep(200000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
下表に、各パラメーターを説明します。
パラメーター | データ型 | 必須 | 説明 |
ConsumeMessageBatchMaxSize | String | 任意 説明 パラメーター値を指定しない場合は、デフォルト値が使用されます。 | バッチで消費されるメッセージの最大数。 キャッシュされたメッセージの数がこのパラメーターの指定された値に達すると、プッシュコンシューマクライアントのSDKは、バッチ消費のために一度にメッセージをコンシューマスレッドに送信します。 有効な値: 1 ~ 1024 デフォルト値: 32。 |
BatchConsumeMaxAwaitDurationInSeconds | String | バッチ間の最大待機時間。 このパラメーターで指定された待機時間に達すると、ApsaraMQ for RocketMQはメッセージを一括でコンシューマーにプッシュします。 有効な値: 1 ~ 450 デフォルト値:0 単位は秒です。 |
- サンプルコードの詳細については、「コードライブラリ」をご参照ください。
- パラメーターの詳細については、「メソッドとパラメーター」をご参照ください。
ベストプラクティス
必要に応じて、ConsumeMessageBatchMaxSizeおよびBatchConsumeMaxAwaitDurationInSecondsパラメーターの値を設定します。 いずれかのパラメーターで指定されたトリガー条件が満たされると、バッチ消費がトリガーされます。 ConsumeMessageBatchMaxSizeパラメーターが128に設定され、BatchConsumeMaxAwaitDurationInSecondsパラメーターが1に設定されているとします。 1秒以内にキャッシュされるメッセージが128未満の場合でも、バッチ消費がトリガーされます。 この場合、Batch-sizeパラメーターには128未満の値が返されます。
バッチ消費を改善するために、コンシューマークライアントにメッセージ等を実装して、メッセージが1回だけ処理されるようにすることをお勧めします。 メッセージべき等性の詳細については、「メッセージべき等性」をご参照ください。