雲訊息佇列 RocketMQ 版提供三種方式來發送普通訊息:同步(Sync)發送、非同步(Async)發送和單向(Oneway)發送。本文介紹了每種發送方式的原理、應用情境、範例程式碼,以及三種發送方式的對比。
前提條件
安裝Java SDK。更多資訊,請參見準備環境。
建立資源。代碼中涉及的資源資訊,例如執行個體、Topic和Group ID等,需要在控制台上提前建立。更多資訊,請參見建立資源。
擷取阿里雲存取金鑰AccessKey ID和AccessKey Secret。更多資訊,請參見建立AccessKey。
可選:日誌配置。
同步發送
原理
同步發送是指訊息發送方發出一條訊息後,會在收到服務端同步響應之後才發下一條訊息的通訊方式。
應用情境
此種方式應用情境非常廣泛,例如重要通知訊息、報名簡訊通知、營銷簡訊系統等。
範例程式碼
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Properties; public class ProducerTest { public static void main(String[] args) { Properties properties = new Properties(); // 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。 // AccessKey ID,阿里雲身分識別驗證標識。 properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // AccessKey Secret,阿里雲身分識別驗證密鑰。 properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); //設定發送逾時時間,單位:毫秒。 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // 設定TCP接入網域名稱,進入雲訊息佇列 RocketMQ 版控制台執行個體詳情頁面的存取點地區查看。 properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); Producer producer = ONSFactory.createProducer(properties); // 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。 producer.start(); //迴圈發送訊息。 for (int i = 0; i < 100; i++){ Message msg = new Message( // 普通訊息所屬的Topic,切勿使用普通訊息的Topic來收發其他類型的訊息。 "TopicTestMQ", // Message Tag可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。 // Tag的具體格式和設定方法,請參見Topic與Tag最佳實務。 "TagA", // Message Body可以是任何二進位形式的資料,訊息佇列RocketMQ版不做任何幹預。 // 需要Producer與Consumer協商好一致的序列化和還原序列化方式。 "Hello MQ".getBytes()); // 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。 // 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。 // 注意:不設定也不會影響訊息正常收發。 msg.setKey("ORDERID_" + i); try { SendResult sendResult = producer.send(msg); // 同步發送訊息,只要不拋異常就是成功。 if (sendResult != null) { System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId()); } } catch (Exception e) { // 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。 System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic()); e.printStackTrace(); } } // 在應用退出前,銷毀Producer對象。 // 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。 producer.shutdown(); } }
非同步發送
原理
非同步發送是指發送方發出一條訊息後,不等服務端返迴響應,接著發送下一條訊息的通訊方式。雲訊息佇列 RocketMQ 版的非同步發送,需要您實現非同步發送回調介面(SendCallback)。訊息發送方在發送了一條訊息後,不需要等待服務端響應即可發送第二條訊息。發送方通過回調介面接收服務端響應,並處理響應結果。
應用情境
非同步發送一般用於鏈路耗時較長,對回應時間較為敏感的業務情境,例如,您視頻上傳後通知啟動轉碼服務,轉碼完成後通知推送轉碼結果等。
範例程式碼
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.OnExceptionContext; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.SendCallback; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Properties; import java.util.concurrent.TimeUnit; public class ProducerTest { public static void main(String[] args) throws InterruptedException { Properties properties = new Properties(); // 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。 // AccessKey ID,阿里雲身分識別驗證標識。 properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // AccessKey Secret,阿里雲身分識別驗證密鑰。 properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); //設定發送逾時時間,單位毫秒。 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // 設定TCP接入網域名稱,進入雲訊息佇列 RocketMQ 版控制台執行個體詳情頁面的存取點地區查看。 properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); Producer producer = ONSFactory.createProducer(properties); // 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。 producer.start(); Message msg = new Message( // 普通訊息所屬的Topic,切勿使用普通訊息的Topic來收發其他類型的訊息。 "TopicTestMQ", // Message Tag,可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。 "TagA", // Message Body,任何二進位形式的資料,訊息佇列RocketMQ版不做任何幹預,需要Producer與Consumer協商好一致的序列化和還原序列化方式。 "Hello MQ".getBytes()); // 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。 // 注意:不設定也不會影響訊息正常收發。 msg.setKey("ORDERID_100"); // 非同步發送訊息, 發送結果通過callback返回給用戶端。 producer.sendAsync(msg, new SendCallback() { @Override public void onSuccess(final SendResult sendResult) { // 訊息發送成功。 System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId()); } @Override public void onException(OnExceptionContext context) { // 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。 System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId()); } }); // 阻塞當前線程3秒,等待非同步發送結果。 TimeUnit.SECONDS.sleep(3); // 在應用退出前,銷毀Producer對象。 // 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。 producer.shutdown(); } }
單向發送
原理
發送方只負責發送訊息,不等待服務端返迴響應且沒有回呼函數觸發,即只發送請求不等待應答。此方式發送訊息的過程耗時非常短,一般在微秒層級。
應用情境
適用於某些耗時非常短,但對可靠性要求並不高的情境,例如日誌收集。
範例程式碼
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; import java.util.Properties; public class ProducerTest { public static void main(String[] args) { Properties properties = new Properties(); // 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。 // AccessKey ID,阿里雲身分識別驗證標識。 properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // AccessKey Secret,阿里雲身分識別驗證密鑰。 properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // 設定發送逾時時間,單位:毫秒。 properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台的執行個體詳情頁面的存取點地區查看。 properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX"); Producer producer = ONSFactory.createProducer(properties); // 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。 producer.start(); //迴圈發送訊息。 for (int i = 0; i < 100; i++){ Message msg = new Message( // 普通訊息所屬的Topic,切勿使用普通訊息的Topic來收發其他類型的訊息。 "TopicTestMQ", // Message Tag, // 可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。 "TagA", // Message Body // 任何二進位形式的資料,雲訊息佇列 RocketMQ 版不做任何幹預,需要Producer與Consumer協商好一致的序列化和還原序列化方式。 "Hello MQ".getBytes()); // 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。 // 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。 // 注意:不設定也不會影響訊息正常收發。 msg.setKey("ORDERID_" + i); // 由於在oneway方式發送訊息時沒有請求應答處理,如果出現訊息發送失敗,則會因為沒有重試而導致資料丟失。若資料不可丟,建議選用可靠同步或可靠非同步發送方式。 producer.sendOneway(msg); } // 在應用退出前,銷毀Producer對象。 // 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。 producer.shutdown(); } }
三種發送方式的對比
發送方式 | 發送TPS | 發送結果反饋 | 可靠性 |
同步發送 | 快 | 有 | 不丟失 |
非同步發送 | 快 | 有 | 不丟失 |
單向發送 | 最快 | 無 | 可能丟失 |
訂閱普通訊息
訂閱普通訊息的範例程式碼,請參見訂閱訊息。