全部产品
Search
文档中心

云消息队列 RocketMQ 版:收发定时消息

更新时间:Jul 25, 2023

本文提供使用TCP协议下的.NET SDK收发定时消息的示例代码。目前支持的地域包括公网、华东1(杭州)、华北2(北京)、华东2(上海)、华南1(深圳)。

定时消息可以做到在指定时间戳之后才可被消费者消费,适用于对消息生产和消费有时间窗口要求,或者利用消息触发定时任务的场景。

定时消息的概念介绍及使用过程中的注意事项,请参见定时和延时消息

前提条件

  • 下载.NET SDK。更多信息,请参见版本说明

  • 环境准备。更多信息,请参见环境准备

  • 创建资源。代码中涉及的资源信息,例如实例、Topic和Group ID等,需要在控制台上提前创建。更多信息,请参见创建资源

  • 获取阿里云访问密钥AccessKey ID和AccessKey Secret。更多信息,请参见创建AccessKey

发送定时消息

说明

具体的示例代码,请以云消息队列 RocketMQ 版代码库为准。

发送定时消息的代码示例如下所示。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.InteropServices;
using ons;

namespace ons
{
    class onscsharp
    {
        private static readonly DateTime Jan1st1970 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);

        public static long CurrentTimeMillis()
        {
           return (long) (DateTime.UtcNow - Jan1st1970).TotalMilliseconds;
        }    
       
        static void Main(string[] args)
        {
            // producer创建和正常工作的参数,必须输入。
            ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
            // 您在消息队列RocketMQ版控制台创建的Group ID。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "XXX ");
            // 设置TCP接入域名,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "XXX");
            // 您在消息队列RocketMQ版控制台创建的Topic。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "XXX");
            // 消息内容。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.MsgContent, "XXX");
            //请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
            //AccessKey ID,阿里云身份验证标识。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
	          //AccessKey Secret,阿里云身份验证密钥。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            
            // 创建producer。
            Producer pProducer = ONSFactory.getInstance().createProducer(factoryInfo);

            // 在发送消息前,必须调用start方法来启动Producer,只需调用一次即可。
            pProducer.start();

            Message msg = new Message(
                // 消息主题。
                factoryInfo.getPublishTopics(),
                // 消息标签。
                "TagA",
                // 消息主体。
                factoryInfo.getMessageContent()
            );

            // 设置代表消息的业务关键属性,请尽可能全局唯一。
            // 以方便您在无法正常收到消息情况下,可通过消息队列RocketMQ版控制台查询消息并补发。
            // 注意:不设置也不会影响消息正常收发。
            msg.setKey("ORDERID_100");

            // deliver time,单位:ms。指定一个时刻,在这个时刻之后消息才能被消费,这个例子表示3s后才能被消费。
            long deliverTime = CurrentTimeMillis() + 3000;
            msg.setStartDeliverTime(deliverTime);

            // 发送消息,只要不抛出异常,就代表发送成功。
            try
            {
                SendResultONS sendResult = pProducer.send(msg);
            }
            catch(ONSClientException e)
            {
                // 发送失败处理。
            }

            // 在应用退出前,必须销毁Producer对象,否则会导致内存泄露等问题。
            pProducer.shutdown();

        }
    }
}           

订阅定时消息

定时消息的订阅与普通消息订阅一致,更多信息,请参见订阅消息