全部產品
Search
文件中心

ApsaraMQ for RocketMQ:發送普通訊息(三種方式)

更新時間:Jul 01, 2024

雲訊息佇列 RocketMQ 版提供三種方式來發送普通訊息:同步(Sync)發送、非同步(Async)發送和單向(Oneway)發送。本文介紹了每種發送方式的原理、應用情境、範例程式碼,以及三種發送方式的對比。

前提條件

  • 安裝Java SDK。更多資訊,請參見準備環境

  • 建立資源。代碼中涉及的資源資訊,例如執行個體、Topic和Group ID等,需要在控制台上提前建立。更多資訊,請參見建立資源

  • 擷取阿里雲存取金鑰AccessKey ID和AccessKey Secret。更多資訊,請參見建立AccessKey

  • 可選:日誌配置

同步發送

  • 原理

    同步發送是指訊息發送方發出一條訊息後,會在收到服務端同步響應之後才發下一條訊息的通訊方式。同步發送

  • 應用情境

    此種方式應用情境非常廣泛,例如重要通知訊息、報名簡訊通知、營銷簡訊系統等。

  • 範例程式碼

        import com.aliyun.openservices.ons.api.Message;
        import com.aliyun.openservices.ons.api.Producer;
        import com.aliyun.openservices.ons.api.SendResult;
        import com.aliyun.openservices.ons.api.ONSFactory;
        import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
        import java.util.Properties;
    
        public class ProducerTest {
            public static void main(String[] args) {
                Properties properties = new Properties();
                // 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
                // AccessKey ID,阿里雲身分識別驗證標識。
                properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
                // AccessKey Secret,阿里雲身分識別驗證密鑰。
                properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
                //設定發送逾時時間,單位:毫秒。
                properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
                // 設定TCP接入網域名稱,進入雲訊息佇列 RocketMQ 版控制台執行個體詳情頁面的存取點地區查看。
                properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
                Producer producer = ONSFactory.createProducer(properties);
                // 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
                producer.start();
    
                //迴圈發送訊息。
                for (int i = 0; i < 100; i++){
                    Message msg = new Message( 
                        // 普通訊息所屬的Topic,切勿使用普通訊息的Topic來收發其他類型的訊息。
                        "TopicTestMQ",
                        // Message Tag可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。
                        // Tag的具體格式和設定方法,請參見Topic與Tag最佳實務。
                        "TagA",
                        // Message Body可以是任何二進位形式的資料,訊息佇列RocketMQ版不做任何幹預。
                        // 需要Producer與Consumer協商好一致的序列化和還原序列化方式。
                        "Hello MQ".getBytes());
                    // 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。
                    // 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。
                    // 注意:不設定也不會影響訊息正常收發。
                    msg.setKey("ORDERID_" + i);
    
                    try {
                        SendResult sendResult = producer.send(msg);
                        // 同步發送訊息,只要不拋異常就是成功。
                        if (sendResult != null) {
                            System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                        }
                    }
                    catch (Exception e) {
                        // 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
                        System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                        e.printStackTrace();
                    }
                }
    
                // 在應用退出前,銷毀Producer對象。
                // 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
                producer.shutdown();
            }
        }               

非同步發送

  • 原理

    非同步發送是指發送方發出一條訊息後,不等服務端返迴響應,接著發送下一條訊息的通訊方式。雲訊息佇列 RocketMQ 版的非同步發送,需要您實現非同步發送回調介面(SendCallback)。訊息發送方在發送了一條訊息後,不需要等待服務端響應即可發送第二條訊息。發送方通過回調介面接收服務端響應,並處理響應結果。

    非同步發送

  • 應用情境

    非同步發送一般用於鏈路耗時較長,對回應時間較為敏感的業務情境,例如,您視頻上傳後通知啟動轉碼服務,轉碼完成後通知推送轉碼結果等。

  • 範例程式碼

        import com.aliyun.openservices.ons.api.Message;
        import com.aliyun.openservices.ons.api.OnExceptionContext;
        import com.aliyun.openservices.ons.api.Producer;
        import com.aliyun.openservices.ons.api.SendCallback;
        import com.aliyun.openservices.ons.api.SendResult;
        import com.aliyun.openservices.ons.api.ONSFactory;
        import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
        import java.util.Properties;
        import java.util.concurrent.TimeUnit;
    
        public class ProducerTest {
            public static void main(String[] args) throws InterruptedException {
                Properties properties = new Properties();
                // 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
                // AccessKey ID,阿里雲身分識別驗證標識。
                properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
                // AccessKey Secret,阿里雲身分識別驗證密鑰。
                properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
                //設定發送逾時時間,單位毫秒。
                properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
                // 設定TCP接入網域名稱,進入雲訊息佇列 RocketMQ 版控制台執行個體詳情頁面的存取點地區查看。
                properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
    
                Producer producer = ONSFactory.createProducer(properties);
                // 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
                producer.start();
    
                Message msg = new Message(
                        // 普通訊息所屬的Topic,切勿使用普通訊息的Topic來收發其他類型的訊息。
                        "TopicTestMQ",
                        // Message Tag,可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。
                        "TagA",
                        // Message Body,任何二進位形式的資料,訊息佇列RocketMQ版不做任何幹預,需要Producer與Consumer協商好一致的序列化和還原序列化方式。
                        "Hello MQ".getBytes());
    
                // 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。
                // 注意:不設定也不會影響訊息正常收發。
                msg.setKey("ORDERID_100");
    
                // 非同步發送訊息, 發送結果通過callback返回給用戶端。
                producer.sendAsync(msg, new SendCallback() {
                    @Override
                    public void onSuccess(final SendResult sendResult) {
                        // 訊息發送成功。
                        System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
                    }
    
                    @Override
                    public void onException(OnExceptionContext context) {
                        // 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
                        System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
                    }
                });
    
                // 阻塞當前線程3秒,等待非同步發送結果。
                TimeUnit.SECONDS.sleep(3);
    
                // 在應用退出前,銷毀Producer對象。
                // 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
                producer.shutdown();
            }
        }                

單向發送

  • 原理

    發送方只負責發送訊息,不等待服務端返迴響應且沒有回呼函數觸發,即只發送請求不等待應答。此方式發送訊息的過程耗時非常短,一般在微秒層級。

    單向發送

  • 應用情境

    適用於某些耗時非常短,但對可靠性要求並不高的情境,例如日誌收集。

  • 範例程式碼

        import com.aliyun.openservices.ons.api.Message;
        import com.aliyun.openservices.ons.api.Producer;
        import com.aliyun.openservices.ons.api.ONSFactory;
        import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
        import java.util.Properties;
    
        public class ProducerTest {
            public static void main(String[] args) {
                Properties properties = new Properties();
                // 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
                // AccessKey ID,阿里雲身分識別驗證標識。
                properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
                // AccessKey Secret,阿里雲身分識別驗證密鑰。
                properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
                // 設定發送逾時時間,單位:毫秒。
                properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
                // 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台的執行個體詳情頁面的存取點地區查看。
                properties.put(PropertyKeyConst.NAMESRV_ADDR,
                  "XXX");
    
                Producer producer = ONSFactory.createProducer(properties);
                // 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
                producer.start();
                //迴圈發送訊息。
                for (int i = 0; i < 100; i++){
                    Message msg = new Message(
                            // 普通訊息所屬的Topic,切勿使用普通訊息的Topic來收發其他類型的訊息。
                            "TopicTestMQ",
                            // Message Tag,
                            // 可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。
                            "TagA",
                            // Message Body
                            // 任何二進位形式的資料,雲訊息佇列 RocketMQ 版不做任何幹預,需要Producer與Consumer協商好一致的序列化和還原序列化方式。
                            "Hello MQ".getBytes());
    
                    // 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。
                    // 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。
                    // 注意:不設定也不會影響訊息正常收發。
                    msg.setKey("ORDERID_" + i);
    
                    // 由於在oneway方式發送訊息時沒有請求應答處理,如果出現訊息發送失敗,則會因為沒有重試而導致資料丟失。若資料不可丟,建議選用可靠同步或可靠非同步發送方式。
                    producer.sendOneway(msg);
                }
    
                // 在應用退出前,銷毀Producer對象。
                // 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
                producer.shutdown();
            }
        }

三種發送方式的對比

發送方式

發送TPS

發送結果反饋

可靠性

同步發送

不丟失

非同步發送

不丟失

單向發送

最快

可能丟失

訂閱普通訊息

訂閱普通訊息的範例程式碼,請參見訂閱訊息