應用情境
在有序事件處理、撮合交易、資料即時增量同步處理等情境下,異構系統間需要維持強一致的狀態同步,上遊的事件變更需要按照順序傳遞到下遊進行處理。在這類情境下使用雲訊息佇列 RocketMQ 版的順序訊息可以有效保證資料轉送的順序性。
典型情境一:撮合交易
以證券、股票交易撮合情境為例,對於出價相同的交易單,堅持按照先出價先交易的原則,下遊處理訂單的系統需要嚴格按照出價順序來處理訂單。
典型情境二:資料即時增量同步處理
以資料庫變更增量同步處理情境為例,上遊源端資料庫按需執行增刪改操作,將二進位動作記錄作為訊息,通過雲訊息佇列 RocketMQ 版傳輸到下遊搜尋系統,下遊系統按順序還原訊息資料,實現狀態資料按序重新整理。如果是普通訊息則可能會導致狀態混亂,和預期操作結果不符,基於順序訊息可以實現下遊狀態和上遊操作結果一致。
功能原理
什麼是順序訊息
順序訊息是雲訊息佇列 RocketMQ 版提供的一種進階訊息類型,支援消費者按照發送訊息的先後順序擷取訊息,從而實現業務情境中的順序處理。
相比其他類型訊息,順序訊息在發送、儲存和投遞的處理過程中,更多強調多條訊息間的先後循序關聯性。
雲訊息佇列 RocketMQ 版順序訊息的循序關聯性通過訊息組(MessageGroup)判定和識別,發送順序訊息時需要為每條訊息設定歸屬的訊息組。
重要 只有同一訊息組的訊息才能保證順序,不同訊息組或未設定訊息組的訊息之間不涉及順序性。
基於訊息組的順序判定邏輯,支援按照商務邏輯做細粒度拆分,可以在滿足業務局部順序的前提下提高系統的並行度和吞吐能力。
如何保證訊息的順序性
雲訊息佇列 RocketMQ 版的訊息的順序性分為兩部分,生產順序性和消費順序性。
生產順序性:雲訊息佇列 RocketMQ 版通過生產者和服務端的協議保障單個生產者串列地發送訊息,並按序儲存和持久化。
如需保證訊息生產的順序性,則必須滿足以下條件:
同一訊息組(MessageGroup)
訊息生產順序性的範圍為訊息組,生產者發送訊息時可以為每條訊息設定訊息組,只有同一訊息組內的訊息可以保證順序性,不同訊息組或未設定訊息組的訊息之間不保證順序。
單一生產者
訊息生產的順序性僅支援單一生產者,不同生產者分布在不同的系統,即使設定相同的訊息組,不同生產者之間產生的訊息也無法判定其先後順序。
串列發送
雲訊息佇列 RocketMQ 版生產者用戶端支援多安全執行緒訪問,但如果生產者使用多線程並行發送,則不同線程間產生的訊息將無法判定其先後順序。
滿足以上條件的生產者,將順序訊息發送至雲訊息佇列 RocketMQ 版後,會保證設定了同一訊息組的訊息,按照發送順序儲存在同一隊列中。服務端順序儲存邏輯如下:
如上圖所示,訊息組1和訊息組4的訊息混合儲存在隊列1中,雲訊息佇列 RocketMQ 版保證訊息組1中的訊息G1-M1、G1-M2、G1-M3是按發送順序儲存,且訊息組4的訊息G4-M1、G4-M2也是按順序儲存,但訊息組1和訊息組4中的訊息不涉及循序關聯性。
消費順序性:雲訊息佇列 RocketMQ 版通過消費者和服務端的協議保障訊息消費嚴格按照儲存的先後順序來處理。
如需保證訊息消費的順序性,則必須滿足以下條件:
投遞順序
雲訊息佇列 RocketMQ 版通過用戶端SDK和服務端通訊協定保障訊息按照服務端儲存順序投遞,但業務方消費訊息時需要嚴格按照接收—處理—應答的語義處理訊息,避免因非同步處理導致訊息亂序。
重要 消費者類型為PushConsumer時,雲訊息佇列 RocketMQ 版保證訊息按照儲存順序一條一條投遞給消費者,若消費者類型為SimpleConsumer,則消費者有可能一次拉取多條訊息。此時,訊息消費的順序性需要由業務方自行保證。消費者類型的具體資訊,請參見消費者分類。
有限重試
雲訊息佇列 RocketMQ 版順序訊息投遞僅在重試次數限定範圍內,即一條訊息如果一直重試失敗,超過最大重試次數後將不再重試,跳過這條訊息消費,不會一直阻塞後續訊息處理。
對於需要嚴格保證消費順序的情境,請務必設定合理的重試次數,避免參數不合理導致訊息亂序。
生產順序性和消費順序性組合
如果訊息需要嚴格按照先進先出(FIFO)的原則處理,即先發送的先消費、後發送的後消費,則必須要同時滿足生產順序性和消費順序性。但一般業務情境下,同一個生產者可能對接多個下遊消費者,不一定所有的消費者業務都需要順序消費,您可以將生產順序性和消費順序性進行差異化組合,應用於不同的業務情境。例如發送順序訊息,但使用非順序的並發消費方式來提高吞吐能力。更多組合方式如下表所示:
生產順序 | 消費順序 | 順序性效果 |
設定訊息組,保證訊息順序發送。 | 順序消費 | 按照訊息組粒度,嚴格保證訊息順序。 同一訊息組內的訊息的消費順序和發送順序完全一致。 |
設定訊息組,保證訊息順序發送。 | 並發消費 | 並發消費,儘可能按時間順序處理。 |
未設定訊息組,訊息亂序發送。 | 順序消費 | 按佇列儲存體粒度,嚴格順序。 基於雲訊息佇列 RocketMQ 版本身隊列的屬性,消費順序和佇列儲存體的順序一致,但不保證和發送順序一致。 |
未設定訊息組,訊息亂序發送。 | 並發消費 | 並發消費,儘可能按照時間順序處理。 |
順序訊息生命週期
初始化
訊息被生產者構建並完成初始化,待發送到服務端的狀態。
待消費
訊息被發送到服務端,對消費者可見,等待消費者消費的狀態。
消費中
訊息被消費者擷取,並按照消費者本地的商務邏輯進行處理的過程。
此時服務端會等待消費者完成消費並提交消費結果,如果一定時間後沒有收到消費者的響應,雲訊息佇列 RocketMQ 版會對訊息進行重試處理。具體資訊,請參見消費重試。
消費提交
消費者完成消費處理,並向服務端提交消費結果,服務端標記當前訊息已經被處理(包括消費成功和失敗)。
雲訊息佇列 RocketMQ 版預設支援保留所有訊息,此時訊息資料並不會立即被刪除,只是邏輯標記已消費。訊息在儲存時間到期或儲存空間不足被刪除前,消費者仍然可以回溯訊息重新消費。
訊息刪除
雲訊息佇列 RocketMQ 版按照訊息儲存機制滾動清理最早的訊息資料,將訊息從物理檔案中刪除。更多資訊,請參見訊息儲存和清理機制。
使用限制
順序訊息僅支援使用MessageType為FIFO的主題,即順序訊息只能發送至類型為順序訊息的主題中,發送的訊息的類型必須和主題的類型一致。
使用樣本
和普通訊息發送相比,順序訊息發送必須要設定訊息組。訊息組的粒度建議按照業務情境,儘可能細粒度設計,以便實現業務拆分和並發擴充。
以Java語言為例,收發順序訊息的範例程式碼如下:
完整的訊息收發範例程式碼請參見RocketMQ 5.x系列SDK(推薦)。
範例程式碼
順序訊息發送
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
/**
* 執行個體存取點,從控制台執行個體詳情頁的存取點頁簽中擷取。
* 如果是在阿里雲ECS內網訪問,建議填寫VPC存取點。
* 如果是在本地公網訪問,或者是線下IDC環境訪問,可以使用公網存取點。使用公網存取點訪問,必須開啟執行個體的公網訪問功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//訊息發送的目標Topic名稱,需要提前在控制台建立,如果不建立直接使用會返回報錯。
String topic = "Your Topic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公網存取點訪問,configuration還需要設定執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 如果是在阿里雲ECS內網訪問,無需填寫該配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
// builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration configuration = builder.build();
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
//順序訊息發送。
Message message = provider.newMessageBuilder()
.setTopic("topic")
//設定訊息索引鍵,可根據關鍵字精確尋找某條訊息。
.setKeys("messageKey")
//設定訊息Tag,用於消費端根據指定Tag過濾訊息。
.setTag("messageTag")
//設定順序訊息的排序分組,該分組盡量保持離散,避免熱點排序分組。
.setMessageGroup("fifoGroup001")
//訊息體。
.setBody("messageBody".getBytes())
.build();
try {
//發送訊息,需要關注發送結果,並捕獲失敗等異常
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}
PushConsumer消費
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
/**
* 執行個體存取點,從控制台執行個體詳情頁的存取點頁簽中擷取。
* 如果是在阿里雲ECS內網訪問,建議填寫VPC存取點。
* 如果是在本地公網訪問,或者是線下IDC環境訪問,可以使用公網存取點。使用公網存取點訪問,必須開啟執行個體的公網訪問功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//指定需要訂閱哪個目標Topic,Topic需要提前在控制台建立,如果不建立直接使用會返回報錯。
String topic = "Your Topic";
//為消費者指定所屬的消費者分組,Group需要提前在控制台建立,如果不建立直接使用會返回報錯。
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公網存取點訪問,configuration還需要設定執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 如果是在阿里雲ECS內網訪問,無需填寫該配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
//訂閱訊息的過濾規則,表示訂閱所有Tag的訊息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
//初始化PushConsumer,需要綁定消費者分組ConsumerGroup、通訊參數以及訂閱關係。
//消費順序訊息時,需要確保當前消費者分組ConsumerGroup是順序投遞模式,否則仍然按並發亂序投遞。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
//設定消費者分組。
.setConsumerGroup(consumerGroup)
//設定預綁定的訂閱關係。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
//設定消費監聽器。
.setMessageListener(messageView -> {
//處理訊息並返回消費結果。
// LOGGER.info("Consume message={}", messageView);
System.out.println("Consume Message: " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
//如果不需要再使用PushConsumer,可關閉該進程。
//pushConsumer.close();
}
}
SimpleConsumer消費
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
public class SimpleConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException {
/**
* 執行個體存取點,從控制台執行個體詳情頁的存取點頁簽中擷取。
* 如果是在阿里雲ECS內網訪問,建議填寫VPC存取點。
* 如果是在本地公網訪問,或者是線下IDC環境訪問,可以使用公網存取點。使用公網存取點訪問,必須開啟執行個體的公網訪問功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//指定需要訂閱哪個目標Topic,Topic需要提前在控制台建立,如果不建立直接使用會返回報錯。
String topic = "Your Topic";
//為消費者指定所屬的消費者分組,Group需要提前在控制台建立,如果不建立直接使用會返回報錯。
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公網存取點訪問,configuration還需要設定執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 如果是在阿里雲ECS內網訪問,無需填寫該配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
Duration awaitDuration = Duration.ofSeconds(10);
//訂閱訊息的過濾規則,表示訂閱所有Tag的訊息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
//初始化SimpleConsumer,需要綁定消費者分組ConsumerGroup、通訊參數以及訂閱關係。
//消費順序訊息時,需要確保當前消費者分組ConsumerGroup是順序投遞模式,否則仍然按並發亂序投遞。
SimpleConsumer consumer = provider.newSimpleConsumerBuilder().setClientConfiguration(clientConfiguration)
//設定消費者分組。
.setConsumerGroup(consumerGroup)
//設定長輪詢逾時時間。
.setAwaitDuration(awaitDuration)
//設定預綁定的訂閱關係。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).build();
//設定本次拉取的最大訊息條數。
int maxMessageNum = 16;
//設定訊息的不可見時間。
Duration invisibleDuration = Duration.ofSeconds(10);
//SimpleConsumer需要用戶端一直主動迴圈擷取訊息,並進行消費處理。
//如果需要提高消費即時性,建議多線程並發拉取。
while (true) {
//需要注意的是,同一個MessageGroup的訊息,如果前序訊息沒有消費完成,再次調用Receive是擷取不到後續訊息的。
final List<MessageView> messageViewList = consumer.receive(maxMessageNum, invisibleDuration);
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消費處理完成後,需要主動調用ACK提交消費結果。
try {
consumer.ack(messageView);
} catch (ClientException e) {
//如果遇到系統流控等原因造成拉取失敗,需要重新發起擷取訊息請求。
e.printStackTrace();
}
});
}
// 如果不需要再使用SimpleConsumer,可關閉該進程。
// consumer.close();
}
}
擷取順序訊息消費重試日誌
PushConsumer順序消費的重試是在消費者用戶端進行,服務端無法擷取消費重試的詳細日誌,若訊息軌跡中順序訊息的投遞結果為失敗時,您需要在消費者用戶端日誌中查看訊息重試的最大次數、消費者用戶端等資訊。
消費者用戶端日誌查看路徑,請參見日誌配置。
您可以通過搜尋以下關鍵字在用戶端日誌中快速定位消費失敗的相關內容:
Message listener raised an exception while consuming messages
Failed to consume fifo message finally, run out of attempt times
使用建議
串列消費,避免批量消費導致亂序
訊息消費建議串列處理,避免一次消費多條訊息,否則可能出現亂序情況。
例如:發送順序為1->2->3->4,消費時批量消費,消費順序為1->23(批量處理,失敗)->23(重試處理)->4,此時可能由於訊息3的失敗導致訊息2被重複處理,最後導致訊息消費亂序。
訊息組儘可能打散,避免集中導致熱點
雲訊息佇列 RocketMQ 版保證相同訊息組的訊息儲存在同一個隊列中,如果不同業務情境的訊息都集中在少量或一個訊息組中,則這些訊息儲存壓力都會集中到服務端的少量隊列或一個隊列中。容易導致效能熱點,且不利於擴充。一般建議的訊息組設計會採用訂單ID、使用者ID作為順序參考,即同一個終端使用者的訊息保證順序,不同使用者的訊息無需保證順序。
因此建議將業務以訊息組粒度進行拆分,例如,將訂單ID、使用者ID作為訊息組關鍵字,可實現同一終端使用者的訊息按照順序處理,不同使用者的訊息無需保證順序。