普通訊息為雲訊息佇列 RocketMQ 版中最基礎的訊息,區別於有特性的順序訊息、定時/延時訊息和事務訊息。本文為您介紹普通訊息的應用情境、功能原理、使用方法和使用建議。
應用情境
普通訊息一般應用於微服務解耦、事件驅動、Data Integration等情境,這些情境大多數要求資料轉送通道具有可靠傳輸的能力,且對訊息的處理時機、處理順序沒有特別要求。
典型情境一:微服務非同步解耦
如上圖所示,以線上的電商交易情境為例,上遊訂單系統將使用者下單支付這一業務事件封裝成獨立的普通訊息並發送至雲訊息佇列 RocketMQ 版服務端,下遊按需從服務端訂閱訊息並按照本地消費邏輯處理下遊任務。每個訊息之間都是相互獨立的,且不需要產生關聯。
典型情境二:Data Integration傳輸
如上圖所示,以離線的日誌收集情境為例,通過埋點組件收集前端應用的相關動作記錄,並轉寄到雲訊息佇列 RocketMQ 版。每條訊息都是一段日誌資料,雲訊息佇列 RocketMQ 版不做任何處理,只需要將日誌資料可靠投遞到下遊的儲存系統和分析系統即可,後續功能由後端應用完成。
功能原理
什麼是普通訊息
定義:普通訊息是雲訊息佇列 RocketMQ 版基本訊息功能,支援生產者和消費者的非同步解耦通訊。
普通訊息生命週期
初始化
訊息被生產者構建並完成初始化,待發送到服務端的狀態。
待消費
訊息被發送到服務端,對消費者可見,等待消費者消費的狀態。
消費中
訊息被消費者擷取,並按照消費者本地的商務邏輯進行處理的過程。
此時服務端會等待消費者完成消費並提交消費結果,如果一定時間後沒有收到消費者的響應,雲訊息佇列 RocketMQ 版會對訊息進行重試處理。具體資訊,請參見消費重試。
消費提交
消費者完成消費處理,並向服務端提交消費結果,服務端標記當前訊息已經被處理(包括消費成功和失敗)。
雲訊息佇列 RocketMQ 版預設支援保留所有訊息,此時訊息資料並不會立即被刪除,只是邏輯標記已消費。訊息在儲存時間到期或儲存空間不足被刪除前,消費者仍然可以回溯訊息重新消費。
訊息刪除
雲訊息佇列 RocketMQ 版按照訊息儲存機制滾動清理最早的訊息資料,將訊息從物理檔案中刪除。更多資訊,請參見訊息儲存和清理機制。
使用限制
普通訊息僅支援使用MessageType為Normal主題,即普通訊息只能發送至類型為普通訊息的主題中,發送的訊息的類型必須和主題的類型一致。
使用樣本
普通訊息支援設定訊息索引鍵、訊息過濾標籤等資訊,用於訊息過濾和搜尋尋找。以Java語言為例,收發普通訊息的範例程式碼如下:
完整的訊息收發範例程式碼請參見RocketMQ 5.x系列SDK(推薦)。
範例程式碼
普通訊息發送
package doc;
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class ProducerExample {
public static void main(String[] args) throws ClientException {
/**
* 執行個體存取點,從控制台執行個體詳情頁的存取點頁簽中擷取。
* 如果是在阿里雲ECS內網訪問,建議填寫VPC存取點。
* 如果是在本地公網訪問,或者是線下IDC環境訪問,可以使用公網存取點。使用公網存取點訪問,必須開啟執行個體的公網訪問功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//訊息發送的目標Topic名稱,需要提前在控制台建立,如果不建立直接使用會返回報錯。
String topic = "Your Topic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公網存取點訪問,configuration還需要設定執行個體的使用者名稱和密碼。使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 如果是在阿里雲ECS內網訪問,無需填寫該配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration configuration = builder.build();
/**
* 初始化Producer時直接配置需要使用的Topic列表(這個參數可以配置多個Topic),實現提前檢查錯誤配置、攔截非法配置啟動。
* 針對非事務訊息 Topic,也可以不配置,服務端會動態檢查訊息的Topic是否合法。
* 注意!!!事務訊息Topic必須提前配置,以免事務訊息回查介面失敗,具體原理請參見事務訊息。
*/
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
//普通訊息發送。
Message message = provider.newMessageBuilder()
.setTopic(topic)
//設定訊息索引鍵,可根據關鍵字精確尋找某條訊息。
.setKeys("messageKey")
//設定訊息Tag,用於消費端根據指定Tag過濾訊息。
.setTag("messageTag")
//訊息體。
.setBody("messageBody".getBytes())
.build();
try {
//發送訊息,需要關注發送結果,並捕獲失敗等異常。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
}
}
PushConsumer消費
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class PushConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException, InterruptedException {
/**
* 執行個體存取點,從控制台執行個體詳情頁的存取點頁簽中擷取。
* 如果是在阿里雲ECS內網訪問,建議填寫VPC存取點。
* 如果是在本地公網訪問,或者是線下IDC環境訪問,可以使用公網存取點。使用公網存取點訪問,必須開啟執行個體的公網訪問功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//指定需要訂閱哪個目標Topic,Topic需要提前在控制台建立,如果不建立直接使用會返回報錯。
String topic = "Your Topic";
//為消費者指定所屬的消費者分組,Group需要提前在控制台建立,如果不建立直接使用會返回報錯。
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公網存取點訪問,configuration還需要設定執行個體的使用者名稱和密碼。使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 如果是在阿里雲ECS內網訪問,無需填寫該配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
//訂閱訊息的過濾規則,表示訂閱所有Tag的訊息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
//初始化PushConsumer,需要綁定消費者分組ConsumerGroup、通訊參數以及訂閱關係。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
//設定消費者分組。
.setConsumerGroup(consumerGroup)
//設定預綁定的訂閱關係。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
//設定消費監聽器。
.setMessageListener(messageView -> {
//處理訊息並返回消費結果。
// LOGGER.info("Consume message={}", messageView);
System.out.println("Consume Message: " + messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
//如果不需要再使用PushConsumer,可關閉該進程。
//pushConsumer.close();
}
}
SimpleConsumer消費
import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
public class SimpleConsumerExample {
private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
private SimpleConsumerExample() {
}
public static void main(String[] args) throws ClientException, IOException {
/**
* 執行個體存取點,從控制台執行個體詳情頁的存取點頁簽中擷取。
* 如果是在阿里雲ECS內網訪問,建議填寫VPC存取點。
* 如果是在本地公網訪問,或者是線下IDC環境訪問,可以使用公網存取點。使用公網存取點訪問,必須開啟執行個體的公網訪問功能。
*/
String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
//指定需要訂閱哪個目標Topic,Topic需要提前在控制台建立,如果不建立直接使用會返回報錯。
String topic = "Your Topic";
//為消費者指定所屬的消費者分組,Group需要提前在控制台建立,如果不建立直接使用會返回報錯。
String consumerGroup = "Your ConsumerGroup";
final ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
/**
* 如果是使用公網存取點訪問,configuration還需要設定執行個體的使用者名稱和密碼。使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 如果是在阿里雲ECS內網訪問,無需填寫該配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
//builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
ClientConfiguration clientConfiguration = builder.build();
Duration awaitDuration = Duration.ofSeconds(10);
//訂閱訊息的過濾規則,表示訂閱所有Tag的訊息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
//初始化SimpleConsumer,需要綁定消費者分組ConsumerGroup、通訊參數以及訂閱關係。
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
//設定消費者分組。
.setConsumerGroup(consumerGroup)
//設定長輪詢逾時時間。
.setAwaitDuration(awaitDuration)
//設定預綁定的訂閱關係。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
//設定本次拉取的最大訊息條數。
int maxMessageNum = 16;
//設定訊息的不可見時間。
Duration invisibleDuration = Duration.ofSeconds(10);
//SimpleConsumer需要用戶端一直主動迴圈擷取訊息,並進行消費處理。
//如果需要提高消費即時性,建議多線程並發拉取。
while (true) {
final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
messages.forEach(messageView -> {
// LOGGER.info("Received message: {}", messageView);
System.out.println("Received message: " + messageView);
});
for (MessageView message : messages) {
final MessageId messageId = message.getMessageId();
try {
//消費處理完成後,需要主動調用ACK向服務端提交消費結果。
consumer.ack(message);
System.out.println("Message is acknowledged successfully, messageId= " + messageId);
//LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
t.printStackTrace();
//LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
}
// 如果不需要再使用SimpleConsumer,可關閉該進程。
// consumer.close();
}
}
使用建議
設定全域唯一業務索引鍵,方便問題追蹤
雲訊息佇列 RocketMQ 版支援自訂索引鍵(訊息的Key),在訊息查詢和軌跡查詢時,可以通過索引鍵高效精確地查詢到訊息。
因此,發送訊息時,建議設定業務上唯一的資訊作為索引,方便後續快速定位訊息。例如,訂單ID,使用者ID等。