すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:バッチ消費

最終更新日:Jul 09, 2024

ApsaraMQ for RocketMQは、バッチ消費機能を提供します。 この機能を使用して、メッセージを効率的に処理したり、ダウンストリームリソースによるAPI呼び出しの数を減らしたりできます。 このトピックでは、バッチ消費機能の定義、利点、シナリオ、制限、およびサンプルコードについて説明します。

バッチ消費とは何ですか?

  • 定義

    ApsaraMQ for RocketMQは、バッチ消費機能を提供します。 この機能により、プッシュコンシューマはコンシューマスレッドにメッセージをバッチで送信できます。 次に、コンシューマースレッドはメッセージをバッチで消費します。

    説明 ApsaraMQ for RocketMQは、コンシューマがプッシュモードでメッセージを取得するかプルモードでメッセージを取得するかに基づいて、プッシュコンシューマとプルコンシューマを提供します。 詳細については、「Terms」をご参照ください。
  • 制御ポリシー機能の動作
    バッチ消費は、次の2つの段階に分けられます。
    1. プロデューサーはApsaraMQ for RocketMQにメッセージを公開します。 次に、プッシュコンシューマのメッセージプルスレッドは、ロングポーリングポリシーを使用してメッセージをプルし、ApsaraMQ for RocketMQのバックエンドにキャッシュします。
    2. プッシュコンシューマは、キャッシュされたメッセージがバッチ消費のために指定された条件の1つを満たすかどうかに基づいて、バッチ消費のためにコンシューマースレッドにメッセージを送信するかどうかを決定します。
    次の図は、バッチ消費のプロセスを示しています。 batch_consume

制限事項

  • バッチ消費はTCPでのみサポートされます。 バージョンが1.8.7.3.Final以降のTCPクライアントSDK for Javaの商用版を使用していることを確認してください。 SDKのリリースノートとSDKの入手方法については、「リリースノート」をご参照ください。
  • 1回のバッチで最大1,024件のメッセージを送信できます。 バッチ間の最大待機時間は450秒です。

メリットとシナリオ

次の情報は、バッチ消費機能の利点とシナリオを示しています。

  • 利点1: メッセージのスループットと処理効率の向上

    シナリオ: ApsaraMQ for RocketMQは、上流の注文システムを下流のElasticsearchシステムから切り離します。 Elasticsearchシステムは、上流の注文システムから10個のログメッセージを消費します。 Elasticsearchシステムの場合、各メッセージはリモートプロシージャコール (RPC) リクエストに相当します。 1つのRPC要求が10ミリ秒を消費すると仮定する。 バッチ消費機能を使用しない場合、10個のログメッセージを消費するには合計100ミリ秒が必要です。 バッチ消費機能を使用すると、10個のメッセージが1つのバッチで消費され、消費時間は10ミリ秒に短縮されます。 このようにして、メッセージを効率的に処理できる。

  • 利点2: ダウンストリームリソースによるAPI呼び出しの数を減らす

    シナリオ: データベースにデータを挿入するとします。 新しいデータごとに挿入ジョブを実行し、データを頻繁に更新する必要がある場合、データベースに大きな負荷がかかる可能性があります。 関連パラメーターを設定して、1つのバッチに10個のデータを挿入し、5秒ごとに挿入操作を実行してシステムの負荷を軽減できます。

サンプルコード

次のサンプルコードは、バッチ消費の例を示しています。

com.aliyun.openservices.ons.api.Actionをインポートします。com.aliyun.openservices.ons.api.ConsumeContextをインポートします。com.aliyun.openservices.ons.api.Messageをインポートします。com.aliyun.openservices.ons.api.batch.BatchConsumerをインポートします。com.aliyun.openservices.ons.api.batch.BatchMessageListenerをインポートします。java.util.Listをインポートします。java.util.Propertiesをインポートします。com.aliyun.openservices.ons.api.ONSFactoryをインポートします。com.aliyun.openservices.ons.api.PropertyKeyConstをインポートします。com.aliyun.openservices.tcp.example.MqConfigをインポートします。public class SimpleBatchConsumer {

    public static void main(String[] args) {
        Properties consumerProperties = new Properties();
        // ApsaraMQ for RocketMQコンソールで作成したコンシューマグループのID。 
        consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MqConfig.GROUP_ID);
        // 環境変数ALIBABA_CLOUD_ACCESS_KEY_IDとALIBABA_CLOUD_ACCESS_KEY_SECRETが設定されていることを確認します。 
        // 認証に使用されるAccessKey ID。 
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        // 認証に使用されるAccessKeyシークレット。 
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // TCPエンドポイント。 ApsaraMQ for RocketMQコンソールの [インスタンスの詳細] ページの [TCPエンドポイントセクション] でエンドポイントを取得できます。 
        consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MqConfig.NAMESRV_ADDR);

        // 一度に消費されるメッセージの最大数。 この例では、値は128として指定されます。 指定されたトピックでキャッシュされたメッセージの数がこの値に達すると、SDKはすぐにコンシューマーがメッセージを消費するためのコールバックメソッドを呼び出します。 有効な値: 1 ~ 1024 デフォルト値: 32。 
        consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
        // 2つの連続したバッチ間の最大待機時間。 この例では、値は10秒として指定されています。 指定された待機時間に達すると、SDKはすぐにコンシューマーがメッセージを消費するためのコールバックメソッドを呼び出します。 有効な値: 0 ~ 450 デフォルト値:0 単位は秒です。 
        consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));

        BatchConsumer batchConsumer=ONSCatchFactory. createBatchConsumer(consumerProperties);
        batchConsumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new BatchMessageListener() {

             @Override
            public Action consume (最終的なリスト <メッセージ> メッセージ、ConsumeContextコンテキスト) {
                System.out.printf("Batch-size: % d\n", messages.size());
                // メッセージをバッチで処理します。 
                return Action.CommitMessage;
            }
        });
        // バッチ消費のためにコンシューマーを起動します。 
        batchConsumer.start();
        System.out.println("Consumer start success.");

        // プロセスが終了しないように、特定の期間待ちます。 
        try {
            Thread.sleep(200000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}         

下表に、各パラメーターを説明します。

パラメーターデータ型必須説明
ConsumeMessageBatchMaxSizeString任意
説明 パラメーター値を指定しない場合は、デフォルト値が使用されます。
バッチで消費されるメッセージの最大数。 キャッシュされたメッセージの数がこのパラメーターの指定された値に達すると、プッシュコンシューマクライアントのSDKは、バッチ消費のために一度にメッセージをコンシューマスレッドに送信します。 有効な値: 1 ~ 1024 デフォルト値: 32。
BatchConsumeMaxAwaitDurationInSecondsStringバッチ間の最大待機時間。 このパラメーターで指定された待機時間に達すると、ApsaraMQ for RocketMQはメッセージを一括でコンシューマーにプッシュします。 有効な値: 1 ~ 450 デフォルト値:0 単位は秒です。
説明

ベストプラクティス

必要に応じて、ConsumeMessageBatchMaxSizeおよびBatchConsumeMaxAwaitDurationInSecondsパラメーターの値を設定します。 いずれかのパラメーターで指定されたトリガー条件が満たされると、バッチ消費がトリガーされます。 ConsumeMessageBatchMaxSizeパラメーターが128に設定され、BatchConsumeMaxAwaitDurationInSecondsパラメーターが1に設定されているとします。 1秒以内にキャッシュされるメッセージが128未満の場合でも、バッチ消費がトリガーされます。 この場合、Batch-sizeパラメーターには128未満の値が返されます。

バッチ消費を改善するために、コンシューマークライアントにメッセージ等を実装して、メッセージが1回だけ処理されるようにすることをお勧めします。 メッセージべき等性の詳細については、「メッセージべき等性」をご参照ください。

関連ドキュメント

商用版のTCPクライアントSDK for Java