全部產品
Search
文件中心

ApsaraMQ for RocketMQ:批量消費

更新時間:Jul 01, 2024

如需提高訊息的處理效率,或降低下遊資源的API調用頻率,您可使用批量消費功能。本文介紹批量消費的定義、優勢與情境、使用限制和範例程式碼等資訊。

什麼是批量消費

  • 定義

    批量消費是雲訊息佇列 RocketMQ 版通過Push消費者提供的、將訊息分批次消費的功能。

    說明 根據訊息擷取方式,雲訊息佇列 RocketMQ 版提供Push和Pull兩種類型的消費者,更多資訊,請參見基本概念
  • 功能原理
    批量消費主要分為以下兩個階段:
    1. 訊息從生產者發布至雲訊息佇列 RocketMQ 版後,Push消費者中的拉訊息線程通過長輪詢將訊息拉到後台緩衝。
    2. Push消費者根據緩衝情況是否滿足任一批量條件,判斷是否將訊息提交給消費線程完成消費。
    具體示意圖如下所示。batch_consume

使用限制

  • 僅TCP協議支援批量消費,HTTP協議暫不支援。請確保您使用的SDK是商業版TCP Java SDK,且版本在1.8.7.3.Final或以上,詳細的版本說明和擷取方式,請參見商業版TCP Java SDK版本說明
  • 支援一次提交最多1024條訊息,支援攢批等待最多450秒。

功能優勢及情境樣本

批量消費的功能優勢和情境樣本說明如下:

  • 優勢一:提高訊息的吞吐能力和處理效率

    情境樣本:上遊訂單系統和下遊Elasticsearch系統間通過雲訊息佇列 RocketMQ 版解耦,Elasticsearch消費訂單系統的10條日誌訊息,每一條訊息對於Elasticsearch系統而言都是一次RPC請求,假設一次RPC請求耗時10毫秒,那麼不使用批量消費的耗時為10×10=100毫秒;理想狀態下,使用批量消費的耗時可縮短至10毫秒,因為10條訊息合并為一次消費,大大提高訊息的處理效率。

  • 優勢二:降低下遊資源的API調用頻率

    情境樣本:給資料庫中插入資料,每更新一條資料執行一次插入任務,如果資料更新較頻繁,可能會對資料庫造成較大壓力。此時,您可以設定每10條資料批量插入一次或每5秒執行一次插入任務,降低系統運行壓力。

範例程式碼

批量消費的範例程式碼如下所示。

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.batch.BatchConsumer;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import java.util.List;
import java.util.Properties;

import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.tcp.example.MqConfig;

public class SimpleBatchConsumer {

    public static void main(String[] args) {
        Properties consumerProperties = new Properties();
        // 您在訊息佇列RocketMQ版控制台建立的Group ID。
        consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MqConfig.GROUP_ID);
        // 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
        // AccessKey ID,阿里雲身分識別驗證標識。
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        // AccessKey Secret,阿里雲身分識別驗證密鑰。
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
        consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MqConfig.NAMESRV_ADDR);

        // 設定批量消費最大訊息數量,當指定Topic的訊息數量已經攢夠128條,SDK立即執行回調進行消費。預設值:32,取值範圍:1~1024。
        consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
        // 設定批量消費最大等待時間長度,當等待時間達到10秒,SDK立即執行回調進行消費。預設值:0,取值範圍:0~450,單位:秒。
        consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));

        BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
        batchConsumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new BatchMessageListener() {

             @Override
            public Action consume(final List<Message> messages, ConsumeContext context) {
                System.out.printf("Batch-size: %d\n", messages.size());
                // 批量訊息處理。
                return Action.CommitMessage;
            }
        });
        //啟動batchConsumer。
        batchConsumer.start();
        System.out.println("Consumer start success.");

        //等待固定時間防止進程退出。
        try {
            Thread.sleep(200000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}         

參數描述如下表所示。

參數名參數類型是否必選描述
ConsumeMessageBatchMaxSizeString
說明 如未指定參數值,則使用預設值。
批量消費的最大訊息數量,緩衝的訊息數量達到參數設定的值,Push消費者SDK會將緩衝的訊息統一提交給消費線程,實現批量消費。取值範圍:[1, 1024],預設值:32,單位:條。
BatchConsumeMaxAwaitDurationInSecondsString批量消費的最大等待時間長度,等待時間長度達到參數設定的值,會將緩衝的訊息統一推送給消費者進行批量消費。取值範圍:[0, 450],預設值:0,單位:秒。
說明

最佳實務

請合理設定ConsumeMessageBatchMaxSizeBatchConsumeMaxAwaitDurationInSeconds參數的取值,只要達到任一參數設定的批量條件,即會觸發提交批量消費。例如ConsumeMessageBatchMaxSize設定為128,BatchConsumeMaxAwaitDurationInSeconds設定為1,1秒內雖然沒有積攢到128條訊息,仍然會觸發批量消費,此時返回的Batch-size會小於128。

此外,為了獲得更好的批量消費效果,強烈推薦您實現訊息等冪,保證訊息有且僅被處理1次。等冪處理的具體資訊,請參見訊息等冪

更多資訊

商業版TCP Java SDK訂閱訊息