訊息佇列 RocketMQ 是一種常用的非同步 RPC 技術。本文以阿里雲訊息佇列 RocketMQ 為例,介紹了如何使用 ACM 對訊息佇列 RocketMQ 實現流量控制。
訊息佇列 RocketMQ 流量控制簡述
對於訊息佇列 RocketMQ 調用,常用的限流方式是在訂閱端限流。限流方式有兩種:
- 針對訊息訂閱者的並發流控
- 針對訊息訂閱者的消費延時流控
針對訊息訂閱者的消費延時流控的基本原理是,每次消費時在用戶端增加一個延時來控制消費速度,此時理論上消費並發最快速度為:
MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber
如果訊息並發消費線程(ConcurrentThreadNumber)為 20,延時(ConsumInterval)為 100 ms,代入上述公式可得:
200 = 1 / 0.1 * 20
由上可知,理論上可以將並發消費控制在 200 以下。
與並發線程數流控相比,消費延時流控的優點在於其實現相對簡單,對訊息佇列 RocketMQ 類用戶端包依賴較少,而且不需要用戶端提供控制並發線程數的動態調整介面。
若使用以上的流量控制方法在分布式架構下做到全域動態控制,可通過配置中心下發流控參數來實現。
下文詳細介紹了如何基於配置中心來實現非同步訊息消費的全域動態流控。樣本中使用了阿里雲的訊息佇列 RocketMQ(訊息佇列)和 ACM(Application Configuration Manangement)產品,語言為 Java。
基於消費延時流控的基本原理
如圖所示,管理員或應用程式通過 ACM 控制台發布消費延時配置(RCV_INTERVAL_TIME),所有訊息佇列 RocketMQ 消費程式訂閱該配置。理論上,該配置從發布到下發至所有用戶端,可以在 1 秒內完成(取決於網路延時)。
程式碼範例
以下章節給出了基於配置中心來實現非同步訊息消費的全域動態流控的程式碼範例。關於 SDK 的詳細介紹,參見 訊息佇列 RocketMQ 訊息佇列 RocketMQ 和Application Configuration Manangement ACM 產品官方文檔。
建立 ACM 配置
在 ACM 上建立消費延時的參數。
設定全域消費延時變數
- 設定消費接收延時的全域變數。
// 初始化訊息接收延時參數,單位為millisecond static int RCV_INTERVAL_TIME = 10000; // 初始化佈建服務,控制台通過範例程式碼自動擷取下面參數 ConfigService.init("acm.aliyun.com", /*租戶ID*/"xxx", /*AK*/"xxx", /*SK*/"yyy"); // 主動擷取配置 String content = ConfigService.getConfig("app.mq.qos", "DEFAULT_GROUP", 6000); Properties p = new Properties(); try { p.load(new StringReader(content)); RCV_INTERVAL_TIME = Integer.valueOf(p.getProperty("RCV_INTERVAL_TIME")); } catch (IOException e) { e.printStackTrace(); }
- 設定 ACM listener,確保當配置被修改時,RCV_INTERVAL_TIME 參數即時更新。
// 初始化的時候,給配置添加監聽,配置變更會回調通知 ConfigService.addListener("app.mq.qos", "DEFAULT_GROUP", new ConfigChangeListener() { public void receiveConfigInfo(String configInfo) { Properties p = new Properties(); try { p.load(new StringReader(configInfo)); RCV_INTERVAL_TIME = Integer.valueOf(p.getProperty("RCV_INTERVAL_TIME")); } catch (IOException e) { e.printStackTrace(); } } });
設定訊息佇列 RocketMQ 消費延時邏輯
完整執行個體如下。
- 本例中 RCV_INTERVAL_TIME 參數的訪問刻意沒有加鎖,原因不做贅述。
- Aliyun ONS Client 不提供動態線程並發數,預設並發為 20。因此本例正好使用消費延時參數來動態調節 QoS。
//以下代碼可直接貼在Main()函數裡
Properties properties = new Properties();
properties.put(PropertyKeyConst.ConsumerId, "CID_consumer_group");
properties.put(PropertyKeyConst.AccessKey,"xxx");
properties.put(PropertyKeyConst.SecretKey, "yyy");
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 設定 TCP 接入網域名稱(此處以公用雲生產環境為例)
properties.put(PropertyKeyConst.ONSAddr,
"http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe(/*Topic*/"topic-name", /*Tag*/null, new MessageListener()
{
public Action consume(Message message, ConsumeContext context) {
//訊息佇列 RocketMQ Subscribe QoS logical start,
// Each consuming process will sleep for RCV_INTERVAL_TIME seconds with 100 ms sleeping cycle.
// Within each cycle, the thread will check RCV_INTERVAL_TIME in case it's set to a smaller value.
// RCV_INTERVAL_TIME <= 0 means no sleeping.
int rcvIntervalTimeLeft = RCV_INTERVAL_TIME;
while (rcvIntervalTimeLeft > 0) {
if (rcvIntervalTimeLeft > RCV_INTERVAL_TIME) {
rcvIntervalTimeLeft = RCV_INTERVAL_TIME;
}
try {
if (rcvIntervalTimeLeft >= 100) {
rcvIntervalTimeLeft -= 100;
Thread.sleep(100);
} else {
Thread.sleep(rcvIntervalTimeLeft);
rcvIntervalTimeLeft = 0;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//訊息佇列 RocketMQ Subscribe interval logical ends
System.out.println("Receive: " + message);
/*
* Put your business logic here.
*/
doSomething();
return Action.CommitMessage;
}
});
consumer.start();
運行結果
在隊列內的訊息無限多的情況下,單機運行 Consumer 進行消費,分三段測試,分別運行約 5 分鐘,通過 ACM 配置推送來達到以下效果。
- RCV_INTERVAL_TIME = 100 ms
- RCV_INTERVAL_TIME = 5000 ms
- RCV_INTERVAL_TIME = 1000 ms
在單訊息佇列 RocketMQ 消費業務處理耗時約 100 ms 情況下、單機並發 20 線程的測試結果如下。
- RCV_INTERVAL_TIME = 100 ms:平均消費效能約為 9000 tpm 左右
- RCV_INTERVAL_TIME = 5000 ms:平均消費效能被限制到 200 tpm 左右
- RCV_INTERVAL_TIME = 1000 ms:平均消費效能回升到 1100 tpm 左右
從以上結果中可以得出:消費和 tpm 成反比,整個過程中應用不中斷,流控推送結果對分布式叢集秒級生效。與預期結果相符。單機效能結果如下所示。