雲訊息佇列 RocketMQ 版5.x版本執行個體可相容Java ONS 1.x SDK用戶端接入,您可以使用ONS 1.x SDK的接入5.x執行個體進行訊息收發。本文為您介紹Java ONS 1.x SDK訊息收發範例程式碼。
重要
- 推薦您使用最新的RocketMQ 5.x系列SDK,5.x系列SDK作為主力研發版本,和雲訊息佇列 RocketMQ 版5.x服務端完全相容,提供了更全面的功能並支援更多增強特性。更多資訊,請參見5.x系列SDK。
- RocketMQ 4.x/3.x系列SDK和ONS系列SDK後續僅做功能維護,建議僅存量業務使用。
Serverless版執行個體公網訪問版本說明
Serverless版執行個體使用公網訪問接入雲訊息佇列 RocketMQ 版時,需要保證使用的Java ONS 1.x SDK版本為1.9.0.Final及以上版本,並在訊息收發代碼中補充如下內容:
properties.setProperty(PropertyKeyConst.Namespace, "InstanceId");
說明
其中,InstanceId
需要替換為您實際使用的執行個體ID。
普通訊息收發樣本
發送普通訊息(同步發送)
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Date;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverlesss執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
// 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
// 設定發送逾時時間,單位:毫秒。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 設定為您從訊息佇列RocketMQ版控制台擷取的存取點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
producer.start();
// 迴圈發送訊息。
for (int i = 0; i < 100; i++){
Message msg = new Message(
// 設定為您在訊息佇列RocketMQ版控制台上建立的Topic。
// 普通訊息所屬的Topic,切勿使用普通訊息的Topic來收發其他類型的訊息。
"TopicTestMQ",
// Message Tag可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。
// Tag的具體格式和設定方法,請參見訊息過濾。
"TagA",
// Message Body可以是任何二進位形式的資料,訊息佇列RocketMQ版不做任何幹預。
// 需要Producer與Consumer協商好一致的序列化和還原序列化方式。
"Hello MQ".getBytes());
// 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。
// 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。
// 注意:不設定也不會影響訊息正常收發。
msg.setKey("ORDERID_" + i);
try {
SendResult sendResult = producer.send(msg);
// 同步發送訊息,只要不拋異常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
}
catch (Exception e) {
// 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
// 在應用退出前,銷毀Producer對象。
// 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
producer.shutdown();
}
}
發送普通訊息(非同步發送)
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class ProducerTest {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
/**
* 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverlesss執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
// 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
//設定發送逾時時間,單位毫秒。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 設定為您從訊息佇列RocketMQ版控制台擷取的存取點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
producer.start();
Message msg = new Message(
// 設定為您在訊息佇列RocketMQ版控制台上建立的Topic。
// 普通訊息所屬的Topic,切勿使用普通訊息的Topic來收發其他類型的訊息。
"TopicTestMQ",
// Message Tag,可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。
"TagA",
// Message Body,任何二進位形式的資料,訊息佇列RocketMQ版不做任何幹預,需要Producer與Consumer協商好一致的序列化和還原序列化方式。
"Hello MQ".getBytes());
// 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。
// 注意:不設定也不會影響訊息正常收發。
msg.setKey("ORDERID_100");
// 非同步發送訊息, 發送結果通過callback返回給用戶端。
producer.sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
// 訊息發送成功。
System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext context) {
// 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
}
});
// 阻塞當前線程3秒,等待非同步發送結果。
TimeUnit.SECONDS.sleep(3);
// 在應用退出前,銷毀Producer對象。
// 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
producer.shutdown();
}
}
發送普通訊息(單向發送)
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverlesss執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
// 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
// 設定發送逾時時間,單位:毫秒。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 設定為您從訊息佇列RocketMQ版控制台擷取的存取點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
producer.start();
// 迴圈發送訊息。
for (int i = 0; i < 100; i++){
Message msg = new Message(
// 設定為您在訊息佇列RocketMQ版控制台上建立的Topic。
// 普通訊息所屬的Topic,切勿使用普通訊息的Topic來收發其他類型的訊息。
"TopicTestMQ",
// Message Tag,
// 可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。
"TagA",
// Message Body
// 任何二進位形式的資料,訊息佇列RocketMQ版不做任何幹預,需要Producer與Consumer協商好一致的序列化和還原序列化方式。
"Hello MQ".getBytes());
// 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。
// 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。
// 注意:不設定也不會影響訊息正常收發。
msg.setKey("ORDERID_" + i);
// 由於在oneway方式發送訊息時沒有請求應答處理,如果出現訊息發送失敗,則會因為沒有重試而導致資料丟失。若資料不可丟,建議選用可靠同步或可靠非同步發送方式。
producer.sendOneway(msg);
}
// 在應用退出前,銷毀Producer對象。
// 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
producer.shutdown();
}
}
發送普通訊息(多線程發送)
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Date;
import java.util.Properties;
public class SharedProducer {
public static void main(String[] args) {
// producer執行個體配置初始化。
Properties properties = new Properties();
// 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
/**
* 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverlesss執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
// 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
properties.put(PropertyKeyConst.AccessKey, "INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
// 設定發送逾時時間,單位:毫秒。
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 設定為您從訊息佇列RocketMQ版控制台擷取的存取點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
Producer producer = ONSFactory.createProducer(properties);
producer.start();
// 建立的Producer和Consumer對象為安全執行緒的,可以在多線程間進行共用,避免每個線程建立一個執行個體。
// thread和anotherThread共用Producer對象,並發地發送訊息至訊息佇列RocketMQ版。
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
Message msg = new Message(
// 設定為您在訊息佇列RocketMQ版控制台上建立的Topic。
// 普通訊息所屬的Topic,切勿使用普通訊息的Topic來收發其他類型的訊息。
"TopicTestMQ",
// Message Tag可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。
"TagA",
// Message Body可以是任何二進位形式的資料,訊息佇列RocketMQ版不做任何幹預。
// 需要Producer與Consumer協商好一致的序列化和還原序列化方式。
"Hello MQ".getBytes());
try {
SendResult sendResult = producer.send(msg);
// 同步發送訊息,只要不拋異常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
});
thread.start();
Thread anotherThread = new Thread(new Runnable() {
@Override
public void run() {
Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
try {
SendResult sendResult = producer.send(msg);
// 同步發送訊息,只要不拋異常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
});
anotherThread.start();
// (可選)Producer執行個體若不再使用時,可將Producer關閉,進行資源釋放。
// producer.shutdown();
}
}
訂閱普通訊息(PushConsumer)
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
/**
* 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverlesss執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
// 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
properties.put(PropertyKeyConst.AccessKey, "INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
// 設定為您從訊息佇列RocketMQ版控制台擷取的存取點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// 叢集訂閱者式(預設)。
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
// 廣播訂閱者式。
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //訂閱多個Tag。
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
// 訂閱另外一個Topic,如需取消訂閱該Topic,請刪除該部分的訂閱代碼,重新啟動消費端即可。
consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // 訂閱全部Tag。
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
訂閱普通訊息(PushConsumer批量消費)
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.batch.BatchConsumer;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import java.util.List;
import java.util.Properties;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
public class SimpleBatchConsumer {
public static void main(String[] args) {
Properties consumerProperties = new Properties();
// 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。
consumerProperties.put(PropertyKeyConst.GROUP_ID, "XXX");
/**
* 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverlesss執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
// 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
consumerProperties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
consumerProperties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
// 設定為您從訊息佇列RocketMQ版控制台擷取的存取點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
consumerProperties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// 設定批量消費最大訊息數量,當指定Topic的訊息數量已經攢夠128條,SDK立即執行回調進行消費。預設值:32,取值範圍:1~1024。
consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
// 設定批量消費最大等待時間長度,當等待時間達到10秒,SDK立即執行回調進行消費。預設值:0,取值範圍:0~450,單位:秒。
consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));
BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
batchConsumer.subscribe("TopicTestMQ", "TagA", new BatchMessageListener() {
@Override
public Action consume(final List<Message> messages, ConsumeContext context) {
System.out.printf("Batch-size: %d\n", messages.size());
// 批量訊息處理。
return Action.CommitMessage;
}
});
// 啟動batchConsumer。
batchConsumer.start();
System.out.println("Consumer start success.");
// 等待固定時間防止進程退出。
try {
Thread.sleep(200000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
訂閱普通訊息(PullConsumer)
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.PullConsumer;
import com.aliyun.openservices.ons.api.TopicPartition;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public class PullConsumerClient {
public static void main(String[] args){
Properties properties = new Properties();
// 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
/**
* 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverlesss執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
// 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
// 設定為您從訊息佇列RocketMQ版控制台擷取的存取點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
PullConsumer consumer = ONSFactory.createPullConsumer(properties);
// 啟動Consumer。
consumer.start();
// 擷取topic-xxx下的所有分區。
Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx");
// 指定需要拉取訊息的分區。
consumer.assign(topicPartitions);
while (true) {
// 拉取訊息,逾時時間為3000 ms。
List<Message> messages = consumer.poll(3000);
System.out.printf("Received message: %s %n", messages);
}
}
}
順序訊息收發樣本
發送順序訊息
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;
import java.util.Date;
import java.util.Properties;
public class ProducerClient {
public static void main(String[] args) {
Properties properties = new Properties();
// 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
/**
* 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverlesss執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
// 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
// 設定為您從訊息佇列RocketMQ版控制台擷取的存取點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
OrderProducer producer = ONSFactory.createOrderProducer(properties);
// 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
producer.start();
for (int i = 0; i < 1000; i++) {
String orderId = "biz_" + i % 10;
Message msg = new Message(
// 設定為您在訊息佇列RocketMQ版控制台上建立的Topic。
"Order_global_topic",
// Message Tag,可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。
"TagA",
// Message Body,可以是任何二進位形式的資料,訊息佇列RocketMQ版不做任何幹預,需要Producer與Consumer協商好一致的序列化和還原序列化方式。
"send order global msg".getBytes()
);
// 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。
// 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。
// 注意:不設定也不會影響訊息正常收發。
msg.setKey(orderId);
// 分區順序訊息中區分不同分區的關鍵字段,Sharding Key與普通訊息的key是完全不同的概念。
// 全域順序訊息,該欄位可以設定為任意非Null 字元串。
String shardingKey = String.valueOf(orderId);
try {
SendResult sendResult = producer.send(msg, shardingKey);
// 發送訊息,只要不拋異常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
}
catch (Exception e) {
// 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
// 在應用退出前,銷毀Producer對象。
// 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
producer.shutdown();
}
}
訂閱順序訊息
package com.aliyun.openservices.ons.example.order;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;
import java.util.Properties;
public class ConsumerClient {
public static void main(String[] args) {
Properties properties = new Properties();
// 設定為您在訊息佇列RocketMQ版控制台建立的Group ID。
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
/**
* 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverlesss執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
// 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
// 設定為您從訊息佇列RocketMQ版控制台擷取的存取點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// 順序訊息消費失敗進行重試前的等待時間,單位(毫秒),取值範圍:10毫秒~30000毫秒。
properties.put(PropertyKeyConst.SuspendTimeMillis,"100");
// 訊息消費失敗時的最大重試次數。
properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");
// 在訂閱訊息前,必須調用start方法來啟動Consumer,只需調用一次即可。
OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);
consumer.subscribe(
// 設定為您在訊息佇列RocketMQ版控制台上建立的Topic。
"Order_global_topic",
// 訂閱指定Topic下的Tags:
// 1. * 表示訂閱所有訊息。
// 2. TagA || TagB || TagC表示訂閱TagA或TagB或TagC的訊息。
"*",
new MessageOrderListener() {
/**
* 1. 訊息消費處理失敗或者處理出現異常,返回OrderAction.Suspend。
* 2. 訊息處理成功,返回OrderAction.Success。
*/
@Override
public OrderAction consume(Message message, ConsumeOrderContext context) {
System.out.println(message);
return OrderAction.Success;
}
});
consumer.start();
}
}
定時訊息收發樣本
發送定時訊息
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
public class ProducerDelayTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverlesss執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
// 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
// 設定為您從訊息佇列RocketMQ版控制台擷取的存取點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
Producer producer = ONSFactory.createProducer(properties);
// 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
producer.start();
Message msg = new Message(
// 設定為您在訊息佇列RocketMQ版控制台上建立的Topic。
"Topic",
// Message Tag可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。
"tag",
// Message Body可以是任何二進位形式的資料,訊息佇列RocketMQ版不做任何幹預,需要Producer與Consumer協商好一致的序列化和還原序列化方式。
"Hello MQ".getBytes());
// 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。
// 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。
// 注意:不設定也不會影響訊息正常收發。
msg.setKey("ORDERID_100");
try {
// 定時訊息,單位毫秒(ms),在指定時間戳記(目前時間之後)進行投遞,例如2016-03-07 16:21:00投遞。如果被設定成目前時間戳之前的某個時刻,訊息將立即被投遞給消費者。
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();
msg.setStartDeliverTime(timeStamp);
// 發送訊息,只要不拋異常就是成功。
SendResult sendResult = producer.send(msg);
System.out.println("Message Id:" + sendResult.getMessageId());
}
catch (Exception e) {
// 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
// 在應用退出前,銷毀Producer對象。
// 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
producer.shutdown();
}
}
訂閱定時訊息
訂閱定時訊息的範例程式碼和訂閱普通訊息一樣,請參見訂閱普通訊息。
延時訊息收發樣本
發送延時訊息
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Date;
import java.util.Properties;
public class ProducerDelayTest {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverlesss執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
// 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
// 設定為您從訊息佇列RocketMQ版控制台擷取的存取點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
Producer producer = ONSFactory.createProducer(properties);
// 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
producer.start();
Message msg = new Message(
// 設定為您在訊息佇列RocketMQ版控制台上建立的Topic。
"Topic",
// Message Tag,可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版伺服器過濾。
"tag",
// Message Body可以是任何二進位形式的資料,訊息佇列RocketMQ版不做任何幹預,需要Producer與Consumer協商好一致的序列化和還原序列化方式。
"Hello MQ".getBytes());
// 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。
// 以方便您在無法正常收到訊息情況下,可通過控制台查詢訊息並補發。
// 注意:不設定也不會影響訊息正常收發。
msg.setKey("ORDERID_100");
try {
// 延時訊息,在指定延遲時間(目前時間之後)進行投遞。最大可設定延遲40天投遞,單位毫秒(ms)。
// 以下樣本表示訊息在3秒後投遞。
long delayTime = System.currentTimeMillis() + 3000;
// 設定訊息需要被投遞的時間。
msg.setStartDeliverTime(delayTime);
SendResult sendResult = producer.send(msg);
// 同步發送訊息,只要不拋異常就是成功。
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
// 在應用退出前,銷毀Producer對象。
// 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
producer.shutdown();
}
}
訂閱延時訊息
訂閱延時訊息的範例程式碼和訂閱普通訊息一樣,請參見訂閱普通訊息。
事務訊息收發樣本
發送事務訊息
package com.aliyun.openservices.tcp.example.producer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import java.util.Date;
import java.util.Properties;
public class SimpleTransactionProducer {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在訊息佇列RocketMQ版控制台建立的Group ID。注意:事務訊息的Group ID不能與其他類型訊息的Group ID共用。
properties.put(PropertyKeyConst.GROUP_ID,"XXX");
/**
* 如果是使用公網存取點訪問,則必須設定AccessKey和SecretKey,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
* 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
* 如果是在阿里雲ECS內網訪問,則無需配置,服務端會根據內網VPC資訊智能擷取。
* 如果執行個體類型為Serverlesss執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
*/
// 執行個體使用者名稱和密碼在訊息佇列RocketMQ版控制台存取控制的智能身份識別頁簽中擷取。
properties.put(PropertyKeyConst.AccessKey,"INSTANCE USER NAME");
properties.put(PropertyKeyConst.SecretKey, "INSTANCE PASSWORD");
//注意!!!使用ONS SDK訪問RocketMQ 5.x執行個體時,InstanceID屬性不需要設定,否則會導致失敗。
// 設定為您從訊息佇列RocketMQ版控制台擷取的存取點,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
// 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
properties.put(PropertyKeyConst.NAMESRV_ADDR, "ACCESS POINT");
// 初始化事務訊息Producer時,需要註冊一個本地事務狀態的Checker。
LocalTransactionCheckerImpl localTransactionChecker = new LocalTransactionCheckerImpl();
TransactionProducer transactionProducer = ONSFactory.createTransactionProducer(properties, localTransactionChecker);
transactionProducer.start();
Message msg = new Message("XXX","TagA","Hello MQ transaction===".getBytes());
for (int i = 0; i < 3; i++) {
try{
SendResult sendResult = transactionProducer.send(msg, new LocalTransactionExecuter() {
@Override
public TransactionStatus execute(Message msg, Object arg) {
System.out.println("執行本地事務,並根據本地事務的狀態提交TransactionStatus。");
return TransactionStatus.CommitTransaction;
}
}, null);
assert sendResult != null;
}catch (ONSClientException e){
// 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
System.out.println(new Date() + " Send mq message failed! Topic is:" + msg.getTopic());
e.printStackTrace();
}
}
System.out.println("Send transaction message success.");
}
}
// 本地事務檢查器。
class LocalTransactionCheckerImpl implements LocalTransactionChecker {
@Override
public TransactionStatus check(Message msg) {
System.out.println("收到事務訊息的回查請求,MsgId: " + msg.getMsgID());
return TransactionStatus.CommitTransaction;
}
}
訂閱事務訊息
訂閱事務訊息的範例程式碼和訂閱普通訊息一樣,請參見訂閱普通訊息。