すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:スケジュールされたメッセージの送受信

最終更新日:Jul 09, 2024

このトピックでは、TCPクライアントSDK for Javaを使用してスケジュールされたメッセージを送受信する方法に関するサンプルコードを提供します。

背景情報

スケジュールされたメッセージは、事前定義のタイムスタンプ後に消費されます。 スケジュールされたメッセージは、メッセージの生成と消費の間に時間ウィンドウが必要なシナリオ、またはスケジュールされたタスクがメッセージを使用してトリガーされるシナリオで使用できます。

スケジュールされたメッセージに使用される用語と、スケジュールされたメッセージを使用するときに必ず実行する必要がある注意事項については、「スケジュールされたメッセージと遅延メッセージ」をご参照ください。

説明

ApsaraMQ for RocketMQの新規ユーザーの場合は、デモプロジェクトを参照してApsaraMQ for RocketMQプロジェクトを構築し、メッセージを送受信することを推奨します。

前提条件

開始する前に、次の操作が実行されていることを確認してください。

  • Java用SDKがインストールされています。 詳細については、「環境の準備」をご参照ください。

  • コードで指定するリソースは、ApsaraMQ for RocketMQコンソールで作成されます。 リソースには、インスタンス、トピック、および消費者グループが含まれます。 詳細については、「リソースの作成」 をご参照ください。

  • Alibaba CloudアカウントのAccessKeyペアが取得されます。 詳細については、「AccessKey の作成」をご参照ください。

  • オプションです。 ロギング設定が設定されています。 詳細については、「ロギング設定」をご参照ください。

スケジュールされたメッセージの送信

サンプルコードの詳細については、「ApsaraMQ For RocketMQコードライブラリ」をご参照ください。

次のサンプルコードは、TCPクライアントSDK for Javaを使用してスケジュールされたメッセージを送信する方法の例を示しています。

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import com.aliyun.openservices.ons.api.SendResult;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Properties;

public class ProducerDelayTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
        // The AccessKey ID that is used for authentication. 
        properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        // The AccessKey secret that is used for authentication. 
        properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        // The TCP endpoint. You can obtain the endpoint in the TCP endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
        properties.put(PropertyKeyConst.NAMESRV_ADDR,
          "XXX");
        Producer producer = ONSFactory.createProducer(properties);
        // Before you send the message, call the start() method only once to start the producer. 
        producer.start();
        Message msg = new Message(
                // The topic in which the message is produced. 
                "Topic",
                // The message tag. A message tag is similar to a Gmail tag and is used by consumers to filter messages on the ApsaraMQ for RocketMQ broker. 
                "tag",
                // The message body. A message body is data in the binary format. ApsaraMQ for RocketMQ does not process message bodies. The producer and consumer must agree on the methods to serialize and deserialize a message body. 
                "Hello MQ".getBytes());
        // The message key. A key is the business-specific attribute of a message and must be globally unique whenever possible. 
        // If you cannot receive a message as expected, you can use the key to query the message in the ApsaraMQ for RocketMQ console. 
        // Note: You can send and receive a message even if you do not specify the key. 
        msg.setKey("ORDERID_100");

        try {
            // The timestamp that indicates the time when the broker delivers the message to the consumer. Unit: milliseconds. For example, if you set this parameter to 2016-03-07 16:21:00, the broker delivers the message at 16:21:00 on March 7, 2016. The value must be later than the current time. If you set this parameter to a time that is earlier than the current time, the message is immediately delivered to the consumer. 
            long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();

            msg.setStartDeliverTime(timeStamp);
            // Send the message. If no exception is thrown, the message is sent. 
            SendResult sendResult = producer.send(msg);
            System.out.println("Message Id:" + sendResult.getMessageId());
        }
        catch (Exception e) {
            // Specify the logic that you want to use to resend or persist the message if the message fails to be sent and needs to be sent again. 
            System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
            e.printStackTrace();
        }

        // Before you exit the application, destroy the producer. 
        // Note: Memory can be saved if you destroy a producer. If you need to frequently send messages, do not destroy a producer. 
        producer.shutdown();
    }
}       

スケジュールされたメッセージの購読

スケジュールされたメッセージをサブスクライブするサンプルコードは、通常のメッセージをサブスクライブするサンプルコードと同じです。 詳細については、『メッセージのサブスクライブ』をご参照ください。