针对初次接触云消息队列 RocketMQ 版的工程师,本文以TCP协议下的Java SDK为例,提供操作示例帮助您从零开始搭建云消息队列 RocketMQ 版测试工程。Demo工程包含普通消息、顺序消息、事务消息、定时和延时消息的测试代码,以及相关Spring的配置。

前提条件

  • 安装IDE。

    您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA为例。

    请下载IntelliJ IDEA Ultimate版本,并参见IntelliJ IDEA说明进行安装。更多信息,请参见下载地址

  • 下载Demo工程。

    下载到本地并解压后即可看到本地新增了mq-demo-master文件夹,该文件夹包括纯Java、Spring以及Spring Boot的示例代码。更多信息,请参见rocketmq-demo

  • 下载安装JDK。更多信息,请参见JDK下载地址

配置Demo工程

  1. 将Demo工程文件导入IntelliJ IDEA。
  2. 创建资源。

    您需要先到消息队列RocketMQ版控制台创建所需资源,包括实例、Topic、Group ID(GID),以及鉴权需要的AccessKey ID(AK)和AccessKey Secret(SK)。

    更多详细信息和操作指导,请参见创建资源

  3. 配置Demo。
    您需将在步骤2中创建好的资源信息配置到MqConfig类和common.xml文件中。
    1. 修改pom.xml文件。建议将ons-client版本号修改为最新版本。具体版本信息,请参见版本说明
      <dependency>
          <groupId>com.aliyun.openservices</groupId>
          <artifactId>ons-client</artifactId>
          <!--建议替换为Java SDK的最新版本号-->
          <version>1.8.8.5.Final</version>
      </dependency>                            
    2. 按以下说明配置MqConfig类。
      public static final String TOPIC = "您已创建的Topic";
      public static final String GROUP_ID = "您已创建的Group ID";
      public static final String ORDER_TOPIC = "您已创建的用于收发顺序消息的Topic";
      public static final String ORDER_GROUP_ID = "您已创建的用于收发顺序消息的Group ID";
      public static final String ACCESS_KEY = "您的阿里云账号的AccessKey ID,获取方式,请参见创建AccessKey";
      public static final String SECRET_KEY = "您的阿里云账号的AccessKey Secret,获取方式,请参见创建AccessKey";
      public static final String TAG = "您自定义的消息Tag属性";
      public static final String NAMESRV_ADDR = "您已创建的云消息队列 RocketMQ 版实例的TCP接入点,可在云消息队列 RocketMQ 版控制台实例详情页面的接入点区域查看";                              
      说明
      • 如果RAM用户拥有该Topic的权限以及自己的AccessKey,那么也可以使用RAM用户的AccessKey。
      • 参数与接口的更多信息,请参见接口和参数说明
    3. 配置common.xml
      <props>
          <prop key="AccessKey">XXX</prop> <!-- 使用前请修改这些资源信息 -->
          <prop key="SecretKey">XXX</prop>
          <prop key="GROUP_ID">XXX</prop>
          <prop key="Topic">XXX</prop>
          <prop key="NAMESRV_ADDR">XXX</prop>
      </props>

以Main方式运行Demo

  1. 发送消息。
    • 发送普通消息:
      • 以纯Java方式发送普通消息:运行SimpleMQProducer类。
      • 以Spring方式发送普通消息:运行ProducerClient类。
      • 以Spring Boot方式发送普通消息:运行ProducerClient类。
    • 发送事务消息:
      • 以纯Java方式发送事务消息:运行SimpleTransactionProducer类。

        LocalTransactionCheckerImpl类为本地事务check接口类,用于校验事务。更多信息,请参见收发事务消息

      • 以Spring方式发送事务消息:运行TransactionProducerClient类。
      • 以Spring Boot方式发送事务消息:运行TransactionProducerClient类。
    • 发送顺序消息:
      • 以纯Java方式发送顺序消息:运行SimpleOrderProducer类。
      • 以Spring方式发送顺序消息:运行OrderProducerClient类。
      • 以Spring Boot方式发送顺序消息:运行OrderProducerClient类。

      此方式下,消息发布和消费都按顺序进行。更多信息,请参见收发顺序消息

    • 发送定时和延时消息:运行MQTimerProducer类发送消息。延时3秒后投递。

      您也可以指定一个精确的投递时间,最长定时时间为40天。更多信息,请参见收发定时消息

    云消息队列 RocketMQ 版控制台,按Topic查询消息,可以看到消息已经发送至Topic。
  2. 接收消息。
    • 接收普通消息:
      • 以纯Java方式接收普通消息:运行SimpleMQConsumer类。
      • 以Spring方式接收普通消息:运行ConsumerClient类。
      • 以Spring Boot方式接收普通消息:运行ConsumerClient类。
    • 接收事务消息:
      • 以纯Java方式接收事务消息:运行SimpleMQConsumer类。
      • 以Spring方式接收事务消息:运行ConsumerClient类。
      • 以Spring Boot方式接收事务消息:运行ConsumerClient类。
    • 接收顺序消息:
      • 以纯Java方式接收顺序消息:运行SimpleOrderConsumer类。
      • 以Spring方式接收顺序消息:运行OrderConsumerClient类。
      • 以Spring Boot方式接收顺序消息:运行OrderConsumerClient类。
    • 接收定时和延时消息:运行SimpleMQConsumer类。
      说明 Spring和Spring Boot框架暂不支持收发定时和延时消息。
    可以看到消息被接收打印的日志。因为有初始化,所以需等待几秒,在生产环境中不会经常初始化。

结果验证:在云消息队列 RocketMQ 版控制台查看消费者状态,可以看到启动的消费端已经在线,并且订阅关系一致。

更多信息