全部產品
Search
文件中心

ApsaraMQ for RocketMQ:收發延時訊息

更新時間:Jul 01, 2024

本文提供使用TCP協議下的Java SDK收發延時訊息的範例程式碼供您參考。

前提條件

您已完成以下操作:

  • 安裝Java SDK。更多資訊,請參見準備環境

  • 建立資源。代碼中涉及的資源資訊,例如執行個體、Topic和Group ID等,需要在控制台上提前建立。更多資訊,請參見建立資源

  • 擷取阿里雲存取金鑰AccessKey ID和AccessKey Secret。更多資訊,請參見建立AccessKey

  • 可選:日誌配置

背景資訊

延時訊息用於指定訊息發送到雲訊息佇列 RocketMQ 版的服務端後,延時一段時間才被投遞到用戶端進行消費(例如3秒後才被消費),適用於解決一些訊息生產和消費有時間視窗要求的情境,或者通過訊息觸發延遲任務的情境,類似於延遲隊列。

延時訊息的概念介紹及使用過程中的注意事項,請參見定時和延時訊息

說明

對於新手使用者,建議在正式收發訊息前,閱讀Demo 工程來瞭解搭建雲訊息佇列 RocketMQ 版工程的具體步驟。

發送延時訊息

具體的範例程式碼,請以雲訊息佇列 RocketMQ 版程式碼程式庫為準。

發送延時訊息的範例程式碼如下:

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;

public class ProducerDelayTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
        // AccessKey ID,阿里雲身分識別驗證標識。
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        // AccessKey Secret,阿里雲身分識別驗證密鑰。
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
        properties.put(PropertyKeyConst.NAMESRV_ADDR,
          "XXX");

        Producer producer = ONSFactory.createProducer(properties);
        // 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
        producer.start();
        Message msg = new Message( 
                // 您在訊息佇列RocketMQ版控制台建立的Topic。
                "Topic",
                // Message Tag,可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版伺服器過濾。
                "tag",
                // Message Body可以是任何二進位形式的資料,訊息佇列RocketMQ版不做任何幹預,需要Producer與Consumer協商好一致的序列化和還原序列化方式。
                "Hello MQ".getBytes());
        // 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。
        // 以方便您在無法正常收到訊息情況下,可通過控制台查詢訊息並補發。
        // 注意:不設定也不會影響訊息正常收發。
        msg.setKey("ORDERID_100");
        try {
            // 延時訊息,在指定延遲時間(目前時間之後)進行投遞。最大可設定延遲40天投遞,單位毫秒(ms)。
            // 以下樣本表示訊息在3秒後投遞。
            long delayTime = System.currentTimeMillis() + 3000;

            // 設定訊息需要被投遞的時間。
            msg.setStartDeliverTime(delayTime);

            SendResult sendResult = producer.send(msg);
            // 同步發送訊息,只要不拋異常就是成功。
            if (sendResult != null) {
            System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
            }
            } catch (Exception e) {
            // 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
            System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
            e.printStackTrace();
        }
        // 在應用退出前,銷毀Producer對象。
        // 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
        producer.shutdown();
    }
}           

訂閱延時訊息

延時訊息的訂閱與普通訊息訂閱一致,更多資訊,請參見訂閱訊息