如需提高訊息的處理效率,或降低下遊資源的API調用頻率,您可使用批量消費功能。本文介紹批量消費的定義、優勢與情境、使用限制和範例程式碼等資訊。
什麼是批量消費
- 定義
批量消費是雲訊息佇列 RocketMQ 版通過Push消費者提供的、將訊息分批次消費的功能。
說明 根據訊息擷取方式,雲訊息佇列 RocketMQ 版提供Push和Pull兩種類型的消費者,更多資訊,請參見基本概念。 - 功能原理批量消費主要分為以下兩個階段:
- 訊息從生產者發布至雲訊息佇列 RocketMQ 版後,Push消費者中的拉訊息線程通過長輪詢將訊息拉到後台緩衝。
- Push消費者根據緩衝情況是否滿足任一批量條件,判斷是否將訊息提交給消費線程完成消費。
具體示意圖如下所示。
使用限制
- 僅TCP協議支援批量消費,HTTP協議暫不支援。請確保您使用的SDK是商業版TCP Java SDK,且版本在1.8.7.3.Final或以上,詳細的版本說明和擷取方式,請參見商業版TCP Java SDK版本說明。
- 支援一次提交最多1024條訊息,支援攢批等待最多450秒。
功能優勢及情境樣本
批量消費的功能優勢和情境樣本說明如下:
- 優勢一:提高訊息的吞吐能力和處理效率
情境樣本:上遊訂單系統和下遊Elasticsearch系統間通過雲訊息佇列 RocketMQ 版解耦,Elasticsearch消費訂單系統的10條日誌訊息,每一條訊息對於Elasticsearch系統而言都是一次RPC請求,假設一次RPC請求耗時10毫秒,那麼不使用批量消費的耗時為10×10=100毫秒;理想狀態下,使用批量消費的耗時可縮短至10毫秒,因為10條訊息合并為一次消費,大大提高訊息的處理效率。
- 優勢二:降低下遊資源的API調用頻率
情境樣本:給資料庫中插入資料,每更新一條資料執行一次插入任務,如果資料更新較頻繁,可能會對資料庫造成較大壓力。此時,您可以設定每10條資料批量插入一次或每5秒執行一次插入任務,降低系統運行壓力。
範例程式碼
批量消費的範例程式碼如下所示。
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.batch.BatchConsumer;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import java.util.List;
import java.util.Properties;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.tcp.example.MqConfig;
public class SimpleBatchConsumer {
public static void main(String[] args) {
Properties consumerProperties = new Properties();
// 您在訊息佇列RocketMQ版控制台建立的Group ID。
consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MqConfig.GROUP_ID);
// 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
// AccessKey ID,阿里雲身分識別驗證標識。
properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
// AccessKey Secret,阿里雲身分識別驗證密鑰。
properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MqConfig.NAMESRV_ADDR);
// 設定批量消費最大訊息數量,當指定Topic的訊息數量已經攢夠128條,SDK立即執行回調進行消費。預設值:32,取值範圍:1~1024。
consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
// 設定批量消費最大等待時間長度,當等待時間達到10秒,SDK立即執行回調進行消費。預設值:0,取值範圍:0~450,單位:秒。
consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));
BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
batchConsumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new BatchMessageListener() {
@Override
public Action consume(final List<Message> messages, ConsumeContext context) {
System.out.printf("Batch-size: %d\n", messages.size());
// 批量訊息處理。
return Action.CommitMessage;
}
});
//啟動batchConsumer。
batchConsumer.start();
System.out.println("Consumer start success.");
//等待固定時間防止進程退出。
try {
Thread.sleep(200000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
參數描述如下表所示。
參數名 | 參數類型 | 是否必選 | 描述 |
ConsumeMessageBatchMaxSize | String | 否 說明 如未指定參數值,則使用預設值。 | 批量消費的最大訊息數量,緩衝的訊息數量達到參數設定的值,Push消費者SDK會將緩衝的訊息統一提交給消費線程,實現批量消費。取值範圍:[1, 1024],預設值:32,單位:條。 |
BatchConsumeMaxAwaitDurationInSeconds | String | 批量消費的最大等待時間長度,等待時間長度達到參數設定的值,會將緩衝的訊息統一推送給消費者進行批量消費。取值範圍:[0, 450],預設值:0,單位:秒。 |
- 具體的範例程式碼,請以雲訊息佇列 RocketMQ 版程式碼程式庫為準。
- 更多參數資訊,請參見介面和參數說明。
最佳實務
請合理設定ConsumeMessageBatchMaxSize和BatchConsumeMaxAwaitDurationInSeconds參數的取值,只要達到任一參數設定的批量條件,即會觸發提交批量消費。例如ConsumeMessageBatchMaxSize設定為128,BatchConsumeMaxAwaitDurationInSeconds設定為1,1秒內雖然沒有積攢到128條訊息,仍然會觸發批量消費,此時返回的Batch-size會小於128。
此外,為了獲得更好的批量消費效果,強烈推薦您實現訊息等冪,保證訊息有且僅被處理1次。等冪處理的具體資訊,請參見訊息等冪。