本文提供使用TCP協議下的Java SDK收發定時訊息的範例程式碼。
背景資訊
定時訊息可以做到在指定時間戳記之後才可被消費者消費,適用於對訊息生產和消費有時間視窗要求,或者利用訊息觸發定時任務的情境。
定時訊息的概念介紹及使用過程中的注意事項,請參見定時和延時訊息。
說明
對於新手使用者,建議在正式收發訊息前,閱讀Demo工程來瞭解搭建雲訊息佇列 RocketMQ 版工程的具體步驟。
前提條件
您已完成以下操作:
安裝Java SDK。更多資訊,請參見準備環境。
建立資源。代碼中涉及的資源資訊,例如執行個體、Topic和Group ID等,需要在控制台上提前建立。更多資訊,請參見建立資源。
擷取阿里雲存取金鑰AccessKey ID和AccessKey Secret。更多資訊,請參見建立AccessKey。
可選:日誌配置。
發送定時訊息
具體的範例程式碼,請以雲訊息佇列 RocketMQ 版程式碼程式庫為準。
發送定時訊息的範例程式碼如下。
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Properties;
public class ProducerDelayTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
// AccessKey ID,阿里雲身分識別驗證標識。
properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
// AccessKey Secret,阿里雲身分識別驗證密鑰。
properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
properties.put(PropertyKeyConst.NAMESRV_ADDR,
"XXX");
Producer producer = ONSFactory.createProducer(properties);
// 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
producer.start();
Message msg = new Message(
// Message所屬的Topic。
"Topic",
// Message Tag可理解為Gmail中的標籤,對訊息進行再歸類,方便Consumer指定過濾條件在訊息佇列RocketMQ版的伺服器過濾。
"tag",
// Message Body可以是任何二進位形式的資料,訊息佇列RocketMQ版不做任何幹預,需要Producer與Consumer協商好一致的序列化和還原序列化方式。
"Hello MQ".getBytes());
// 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。
// 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。
// 注意:不設定也不會影響訊息正常收發。
msg.setKey("ORDERID_100");
try {
// 定時訊息,單位毫秒(ms),在指定時間戳記(目前時間之後)進行投遞,例如2016-03-07 16:21:00投遞。如果被設定成目前時間戳之前的某個時刻,訊息將立即被投遞給消費者。
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();
msg.setStartDeliverTime(timeStamp);
// 發送訊息,只要不拋異常就是成功。
SendResult sendResult = producer.send(msg);
System.out.println("Message Id:" + sendResult.getMessageId());
}
catch (Exception e) {
// 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
// 在應用退出前,銷毀Producer對象。
// 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
producer.shutdown();
}
}
訂閱定時訊息
定時訊息的訂閱者式與普通訊息一致,更多資訊,請參見訂閱訊息。