本文介紹Apache RocketMQ Java SDK的訊息收發範例程式碼。
範例程式碼
重要
如果您使用的是Serverless版執行個體,在公網訪問的時候需要注意SDK的版本等資訊,詳情請參見Serverless版執行個體公網訪問版本說明。
gRPC協議SDK
使用gRPC協議的SDK為rocketmq-client-java
,下列是使用該SDK的範例程式碼。
訊息類型 | 發送訊息範例程式碼 | 訂閱訊息範例程式碼 | |
Remoting協議SDK
使用Remoting協議的SDK為rocketmq-client
,下列是使用該SDK的範例程式碼。
普通訊息
發送普通訊息(同步發送)
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.Date;
public class RocketMQProducer {
/**
* 如果是使用公網存取點訪問,則必須設定RpcHook,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,無需初始化RpcHook,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
}
public static void main(String[] args) throws MQClientException {
// 使用公網存取點時,需要配置RPCHook。
DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
// 使用VPC存取點時,無需配置RPCHook。
// 如果執行個體類型為Serverless執行個體,則必須配置RPCHook。
// DefaultMQProducer producer = new DefaultMQProducer();
//您在訊息佇列RocketMQ版控制台建立的Group ID。
producer.setProducerGroup("YOUR GROUP ID");
// 設定接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項;如果不開啟訊息軌跡功能,則不需要運行此項。
producer.setAccessChannel(AccessChannel.CLOUD);
// 5.3.0版本及以上SDK開啟訊息軌跡除需設定AccessChannel外,需要增加,EnableTrace參數
producer.setEnableTrace(true);
// 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
producer.setNamesrvAddr("YOUR ACCESS POINT");
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message("YOUR TOPIC",
"YOUR MESSAGE TAG",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
//訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
// 在應用退出前,銷毀Producer對象。
// 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
producer.shutdown();
}
}
發送普通訊息(非同步發送)
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class RocketMQAsyncProducer {
/**
* 如果是使用公網存取點訪問,則必須設定RpcHook,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,無需初始化RpcHook,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
}
public static void main(String[] args) throws MQClientException, InterruptedException {
// 使用公網存取點時,需要配置RPCHook。
DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
// 使用VPC存取點時,無需配置RPCHook。
// 如果執行個體類型為Serverless執行個體,則必須配置RPCHook。
// DefaultMQProducer producer = new DefaultMQProducer();
//您在訊息佇列RocketMQ版控制台建立的Group ID
producer.setProducerGroup("YOUR GROUP ID");
// 設定接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項;如果不開啟訊息軌跡功能,則不需要運行此項。
producer.setAccessChannel(AccessChannel.CLOUD);
// 5.3.0版本及以上SDK開啟訊息軌跡除需設定AccessChannel外,需要增加,EnableTrace參數
producer.setEnableTrace(true);
// 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
producer.setNamesrvAddr("YOUR ACCESS POINT");
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message("YOUR TOPIC",
"YOUR MESSAGE TAG",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
// 訊息發送成功。
System.out.println("send message success. msgId= " + result.getMsgId());
}
@Override
public void onException(Throwable throwable) {
// 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
System.out.println("send message failed.");
throwable.printStackTrace();
}
});
} catch (Exception e) {
// 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
// 阻塞當前線程3秒,等待非同步發送結果。
TimeUnit.SECONDS.sleep(3);
// 在應用退出前,銷毀Producer對象。
// 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
producer.shutdown();
}
}
發送普通訊息(單向發送)
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.Date;
public class RocketMQOnewayProducer {
/**
* 如果是使用公網存取點訪問,則必須設定RpcHook,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,無需初始化RpcHook,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
}
public static void main(String[] args) throws MQClientException {
// 使用公網存取點時,需要配置RPCHook。
DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
// 使用VPC存取點時,無需配置RPCHook。
// 如果執行個體類型為Serverless執行個體,則必須配置RPCHook。
// DefaultMQProducer producer = new DefaultMQProducer();
//您在訊息佇列RocketMQ版控制台建立的Group ID。
producer.setProducerGroup("YOUR GROUP ID");
// 設定接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項;如果不開啟訊息軌跡功能,則不需要運行此項。
producer.setAccessChannel(AccessChannel.CLOUD);
// 5.3.0版本及以上SDK開啟訊息軌跡除需設定AccessChannel外,需要增加,EnableTrace參數
producer.setEnableTrace(true);
// 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
producer.setNamesrvAddr("YOUR ACCESS POINT");
producer.start();
for (int i = 0; i < 128; i++) {
try {
Message msg = new Message("YOUR TOPIC",
"YOUR MESSAGE TAG",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
} catch (Exception e) {
// 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
// 在應用退出前,銷毀Producer對象。
// 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
producer.shutdown();
}
}
訂閱普通訊息
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class RocketMQPushConsumer {
/**
* 如果是使用公網存取點訪問,則必須設定RpcHook,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,無需初始化RpcHook,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
}
public static void main(String[] args) throws MQClientException {
// 使用公網存取點時,需要配置RPCHook。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
// 使用VPC存取點時,無需配置RPCHook。
// 如果執行個體類型為Serverless執行個體,則必須配置RPCHook。
// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
//您在訊息佇列RocketMQ版控制台建立的Group ID。
consumer.setConsumerGroup("YOUR GROUP ID");
// 設定接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項;如果不開啟訊息軌跡功能,則不需要運行此項。
consumer.setAccessChannel(AccessChannel.CLOUD);
// 5.3.0版本及以上SDK開啟訊息軌跡除需設定AccessChannel外,需要增加,EnableTrace參數
consumer.setEnableTrace(true);
// 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
consumer.setNamesrvAddr("YOUR ACCESS POINT");
// 設定為您在阿里雲雲訊息佇列 RocketMQ 版控制台上建立的Topic。
consumer.subscribe("YOUR TOPIC", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("Receive New Messages: %s %n", msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
順序訊息
發送順序訊息
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.List;
public class RocketMQOrderProducer {
/**
* 如果是使用公網存取點訪問,則必須設定RpcHook,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,無需初始化RpcHook,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
}
public static void main(String[] args) throws MQClientException {
// 使用公網存取點時,需要配置RPCHook。
DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
// 使用VPC存取點時,無需配置RPCHook。
// 如果執行個體類型為Serverless執行個體,則必須配置RPCHook。
// DefaultMQProducer producer = new DefaultMQProducer();
//您在訊息佇列RocketMQ版控制台建立的Group ID。
producer.setProducerGroup("YOUR GROUP ID");
// 設定接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項;如果不開啟訊息軌跡功能,則不需要運行此項。
producer.setAccessChannel(AccessChannel.CLOUD);
// 5.3.0版本及以上SDK開啟訊息軌跡除需設定AccessChannel外,需要增加,EnableTrace參數
producer.setEnableTrace(true);
// 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
producer.setNamesrvAddr("YOUR ACCESS POINT");
producer.start();
for (int i = 0; i < 128; i++) {
try {
int orderId = i % 10;
Message msg = new Message("YOUR ORDER TOPIC",
"YOUR MESSAGE TAG",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 注意!!!請務必設定該配置項,順序訊息才能均勻分布到各隊列中。
// 5.x版本下面一行代碼可以替換為 msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
msg.putUserProperty("__SHARDINGKEY", orderId + "");
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 選擇適合自己的分區選擇演算法,保證同一個參數得到的結果相同。
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
訂閱順序訊息
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import java.util.List;
public class RocketMQOrderConsumer {
private static RPCHook getAclRPCHook() {
/**
* 如果是使用公網存取點訪問,則必須設定RpcHook,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,無需初始化RpcHook,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
}
public static void main(String[] args) throws MQClientException {
// 使用公網存取點時,需要配置RPCHook。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
// 使用VPC存取點時,無需配置RPCHook。
// 如果執行個體類型為Serverless執行個體,則必須配置RPCHook。
// DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
//您在訊息佇列RocketMQ版控制台建立的Group ID。
consumer.setConsumerGroup("YOUR GROUP ID");
// 設定接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項;如果不開啟訊息軌跡功能,則不需要運行此項。
consumer.setAccessChannel(AccessChannel.CLOUD);
// 5.3.0版本及以上SDK開啟訊息軌跡除需設定AccessChannel外,需要增加,EnableTrace參數
consumer.setEnableTrace(true);
// 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
consumer.setNamesrvAddr("YOUR ACCESS POINT");
consumer.subscribe("YOUR ORDER TOPIC", "*");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 消費失敗則掛起重試返回:ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT。
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
定時/延時訊息
發送定時/延時訊息
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.Date;
public class RocketMQDelayProducer {
/**
* 如果是使用公網存取點訪問,則必須設定RpcHook,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,無需初始化RpcHook,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
}
public static void main(String[] args) throws MQClientException {
// 使用公網存取點時,需要配置RPCHook。
DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
// 使用VPC存取點時,無需配置RPCHook。
// 如果執行個體類型為Serverless執行個體,則必須配置RPCHook。
// DefaultMQProducer producer = new DefaultMQProducer();
//您在訊息佇列RocketMQ版控制台建立的Group ID。
producer.setProducerGroup("YOUR GROUP ID");
// 設定接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項;如果不開啟訊息軌跡功能,則不需要運行此項。
producer.setAccessChannel(AccessChannel.CLOUD);
// 5.3.0版本及以上SDK開啟訊息軌跡除需設定AccessChannel外,需要增加,EnableTrace參數
producer.setEnableTrace(true);
// 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
producer.setNamesrvAddr("YOUR ACCESS POINT");
producer.start();
for (int i = 0; i < 128; i++) {
try {
// 設定為您在雲訊息佇列 RocketMQ 版控制台建立的Topic。
Message msg = new Message("YOUR TOPIC",
// 設定訊息的Tag。
"YOUR MESSAGE TAG",
// 訊息內容。
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 發送延時訊息,需要設定延時時間,單位毫秒(ms),訊息將在指定延時時間後投遞,例如訊息將在3秒後投遞。
long delayTime = System.currentTimeMillis() + 3000;
msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));
// 若需要發送定時訊息,則需要設定定時時間,訊息將在指定時間進行投遞,例如訊息將在2021-08-10 18:45:00投遞。
// 定時時間格式為:yyyy-MM-dd HH:mm:ss,若設定的時間戳記在目前時間之前,則訊息將被立即投遞給Consumer。
// longtimeStamp=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
// msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
// 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
// 在應用退出前,銷毀Producer對象。
// 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
producer.shutdown();
}
}
訂閱定時/延時訊息的範例程式碼和訂閱普通訊息相同。
事務訊息
發送事務訊息
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQTransactionProducer {
private static RPCHook getAclRPCHook() {
/**
* 如果是使用公網存取點訪問,則必須設定RpcHook,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,無需初始化RpcHook,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
}
public static void main(String[] args) throws MQClientException {
// 使用公網存取點時,需要配置RPCHook。
// 您在訊息佇列RocketMQ版控制台建立的Group ID。注意:事務訊息的Group ID不能與其他類型訊息的Group ID共用。
TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook());
// 使用VPC存取點時,無需配置RPCHook。
// 如果執行個體類型為Serverless執行個體,則必須配置RPCHook。
// TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID");
// 設定接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項;如果不開啟訊息軌跡功能,則不需要運行此項。
transactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
// 5.3.0版本及以上SDK開啟訊息軌跡除需設定AccessChannel外,需要增加,EnableTrace參數
transactionMQProducer.setEnableTrace(true);
// 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
transactionMQProducer.setNamesrvAddr("YOUR ACCESS POINT");
transactionMQProducer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("開始執行本地事務: " + msg);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("收到事務訊息的回查請求, MsgId: " + msg.getMsgId());
return LocalTransactionState.COMMIT_MESSAGE;
}
});
transactionMQProducer.start();
for (int i = 0; i < 10; i++) {
try {
Message message = new Message("YOUR TRANSACTION TOPIC",
"YOUR MESSAGE TAG",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, null);
assert sendResult != null;
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
訂閱事務訊息的範例程式碼和訂閱普通訊息相同。
Serverless版執行個體公網訪問版本說明
Serverless版執行個體使用公網訪問接入雲訊息佇列 RocketMQ 版時,需要保證使用的SDK版本滿足以下要求,並在訊息收發代碼中補充如下內容:
說明
其中,InstanceId
需要替換為您實際使用的執行個體ID。
SDK版本:rocketmq-client ≥ 5.2.0
訊息發送代碼補充:
producer.setNamespaceV2("InstanceId");
訊息消費代碼補充:
consumer.setNamespaceV2("InstanceId");
SDK版本:rocketmq-client-java ≥ 5.0.6
訊息發送和訊息消費代碼補充:
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .setNamespace("InstanceId") .setCredentialProvider(sessionCredentialsProvider) .build();