全部產品
Search
文件中心

ApsaraMQ for RocketMQ:範例程式碼

更新時間:Jul 23, 2024

雲訊息佇列 RocketMQ 版5.x版本執行個體可相容4.x/3.x SDK用戶端接入,您可以使用4.x/3.x版本的SDK接入5.x執行個體進行訊息收發。本文為您介紹4.x/3.x版本下的Java SDK訊息收發範例程式碼。

重要
  • 推薦您使用最新的RocketMQ 5.x系列SDK,5.x系列SDK作為主力研發版本,和雲訊息佇列 RocketMQ 版5.x服務端完全相容,提供了更全面的功能並支援更多增強特性。更多資訊,請參見5.x系列SDK
  • RocketMQ 4.x/3.x系列SDK和ONS系列SDK後續僅做功能維護,建議僅存量業務使用。

普通訊息收發樣本

發送普通訊息(同步發送)

import java.util.Date;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQProducer {
    /**
     * 如果是使用公網存取點訪問,則必須設定RpcHook,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
     * 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
     * 如果是在阿里雲ECS內網訪問,無需初始化RpcHook,服務端會根據內網VPC資訊智能擷取。
     * 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // 使用公網存取點時,需要配置RPCHook。
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // 使用VPC存取點時,無需配置RPCHook。
        // 如果執行個體類型為Serverless執行個體,則必須配置RPCHook。
        // DefaultMQProducer producer = new DefaultMQProducer();

        //您在訊息佇列RocketMQ版控制台建立的Group ID。
        producer.setProducerGroup("YOUR GROUP ID");

        // 設定接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項;如果不開啟訊息軌跡功能,則不需要運行此項。
        producer.setAccessChannel(AccessChannel.CLOUD);

        // 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                //訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // 在應用退出前,銷毀Producer對象。
        // 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
        producer.shutdown();
    }
}

發送普通訊息(非同步發送)

import java.util.Date;
import java.util.concurrent.TimeUnit;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQAsyncProducer {
    /**
     * 如果是使用公網存取點訪問,則必須設定RpcHook,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
     * 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
     * 如果是在阿里雲ECS內網訪問,無需初始化RpcHook,服務端會根據內網VPC資訊智能擷取。
     * 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 使用公網存取點時,需要配置RPCHook。
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // 使用VPC存取點時,無需配置RPCHook。
        // 如果執行個體類型為Serverless執行個體,則必須配置RPCHook。
        // DefaultMQProducer producer = new DefaultMQProducer();

        //您在訊息佇列RocketMQ版控制台建立的Group ID
        producer.setProducerGroup("YOUR GROUP ID");

        // 設定接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項;如果不開啟訊息軌跡功能,則不需要運行此項。
        producer.setAccessChannel(AccessChannel.CLOUD);

        // 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult result) {
                        // 訊息發送成功。
                        System.out.println("send message success. msgId= " + result.getMsgId());
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        // 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
                        System.out.println("send message failed.");
                        throwable.printStackTrace();
                    }
                });
            } catch (Exception e) {
                // 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }
        // 阻塞當前線程3秒,等待非同步發送結果。
        TimeUnit.SECONDS.sleep(3);

        // 在應用退出前,銷毀Producer對象。
        // 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
        producer.shutdown();
    }
}

發送普通訊息(單向發送)

import java.util.Date;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQOnewayProducer {
    /**
     * 如果是使用公網存取點訪問,則必須設定RpcHook,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
     * 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
     * 如果是在阿里雲ECS內網訪問,無需初始化RpcHook,服務端會根據內網VPC資訊智能擷取。
     * 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // 使用公網存取點時,需要配置RPCHook。
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // 使用VPC存取點時,無需配置RPCHook。
        // 如果執行個體類型為Serverless執行個體,則必須配置RPCHook。
        // DefaultMQProducer producer = new DefaultMQProducer();

        //您在訊息佇列RocketMQ版控制台建立的Group ID。
        producer.setProducerGroup("YOUR GROUP ID");

        // 設定接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項;如果不開啟訊息軌跡功能,則不需要運行此項。
        producer.setAccessChannel(AccessChannel.CLOUD);

        // 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                Message msg = new Message("YOUR TOPIC",
                        "YOUR MESSAGE TAG",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.sendOneway(msg);
            } catch (Exception e) {
                // 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // 在應用退出前,銷毀Producer對象。
        // 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
        producer.shutdown();
    }
}

訂閱普通訊息

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class RocketMQPushConsumer {
    /**
     * 如果是使用公網存取點訪問,則必須設定RpcHook,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
     * 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
     * 如果是在阿里雲ECS內網訪問,無需初始化RpcHook,服務端會根據內網VPC資訊智能擷取。
     * 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // 使用公網存取點時,需要配置RPCHook。
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
        // 使用VPC存取點時,無需配置RPCHook。
        // 如果執行個體類型為Serverless執行個體,則必須配置RPCHook。
        // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();

        //您在訊息佇列RocketMQ版控制台建立的Group ID。
        consumer.setConsumerGroup("YOUR GROUP ID");

        // 設定接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項;如果不開啟訊息軌跡功能,則不需要運行此項。
        consumer.setAccessChannel(AccessChannel.CLOUD);

        // 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
        consumer.setNamesrvAddr("YOUR ACCESS POINT");
        // 設定為您在阿里雲雲訊息佇列 RocketMQ 版控制台上建立的Topic。
        consumer.subscribe("YOUR TOPIC", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("Receive New Messages: %s %n", msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

順序訊息收發樣本

發送順序訊息

import java.util.List;

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQOrderProducer {
    /**
     * 如果是使用公網存取點訪問,則必須設定RpcHook,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
     * 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
     * 如果是在阿里雲ECS內網訪問,無需初始化RpcHook,服務端會根據內網VPC資訊智能擷取。
     * 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // 使用公網存取點時,需要配置RPCHook。
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // 使用VPC存取點時,無需配置RPCHook。
        // 如果執行個體類型為Serverless執行個體,則必須配置RPCHook。
        // DefaultMQProducer producer = new DefaultMQProducer();

        //您在訊息佇列RocketMQ版控制台建立的Group ID。
        producer.setProducerGroup("YOUR GROUP ID");

        // 設定接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項;如果不開啟訊息軌跡功能,則不需要運行此項。
        producer.setAccessChannel(AccessChannel.CLOUD);

        // 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                int orderId = i % 10;
                Message msg = new Message("YOUR ORDER TOPIC",
                        "YOUR MESSAGE TAG",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 注意!!!請務必設定該配置項,順序訊息才能均勻分布到各隊列中。
                // 5.x版本下面一行代碼可以替換為 msg.putUserProperty(MessageConst.PROPERTY_SHARDING_KEY, orderId + "");
                msg.putUserProperty("__SHARDINGKEY", orderId + "");
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // 選擇適合自己的分區選擇演算法,保證同一個參數得到的結果相同。
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

訂閱順序訊息

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;

import java.util.List;

public class RocketMQOrderConsumer {
    private static RPCHook getAclRPCHook() {
        /**
         * 如果是使用公網存取點訪問,則必須設定RpcHook,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
         * 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
         * 如果是在阿里雲ECS內網訪問,無需初始化RpcHook,服務端會根據內網VPC資訊智能擷取。
         * 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
         */
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // 使用公網存取點時,需要配置RPCHook。
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(getAclRPCHook());
        // 使用VPC存取點時,無需配置RPCHook。
        // 如果執行個體類型為Serverless執行個體,則必須配置RPCHook。
        // DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();

        //您在訊息佇列RocketMQ版控制台建立的Group ID。
        consumer.setConsumerGroup("YOUR GROUP ID");

        // 設定接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項;如果不開啟訊息軌跡功能,則不需要運行此項。
        consumer.setAccessChannel(AccessChannel.CLOUD);

        // 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
        consumer.setNamesrvAddr("YOUR ACCESS POINT");
        consumer.subscribe("YOUR ORDER TOPIC", "*");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 消費失敗則掛起重試返回:ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT。
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

定時/延時訊息範例程式碼

發送定時/延時訊息

import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class RocketMQDelayProducer {
    /**
     * 如果是使用公網存取點訪問,則必須設定RpcHook,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
     * 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
     * 如果是在阿里雲ECS內網訪問,無需初始化RpcHook,服務端會根據內網VPC資訊智能擷取。
     * 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
     */
    private static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
    }

    public static void main(String[] args) throws MQClientException {
        // 使用公網存取點時,需要配置RPCHook。
        DefaultMQProducer producer = new DefaultMQProducer(getAclRPCHook());
        // 使用VPC存取點時,無需配置RPCHook。
        // 如果執行個體類型為Serverless執行個體,則必須配置RPCHook。
        // DefaultMQProducer producer = new DefaultMQProducer();

        //您在訊息佇列RocketMQ版控制台建立的Group ID。
        producer.setProducerGroup("YOUR GROUP ID");

        // 設定接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項;如果不開啟訊息軌跡功能,則不需要運行此項。
        producer.setAccessChannel(AccessChannel.CLOUD);

        // 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
        // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
        producer.setNamesrvAddr("YOUR ACCESS POINT");
        producer.start();

        for (int i = 0; i < 128; i++) {
            try {
                // 設定為您在雲訊息佇列 RocketMQ 版控制台建立的Topic。
                Message msg = new Message("YOUR TOPIC",
                        // 設定訊息的Tag。
                        "YOUR MESSAGE TAG",
                        // 訊息內容。
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 發送延時訊息,需要設定延時時間,單位毫秒(ms),訊息將在指定延時時間後投遞,例如訊息將在3秒後投遞。
                long delayTime = System.currentTimeMillis() + 3000;
                msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));

                // 若需要發送定時訊息,則需要設定定時時間,訊息將在指定時間進行投遞,例如訊息將在2021-08-10 18:45:00投遞。
                // 定時時間格式為:yyyy-MM-dd HH:mm:ss,若設定的時間戳記在目前時間之前,則訊息將被立即投遞給Consumer。
                // longtimeStamp=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
                // msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                // 訊息發送失敗,需要進行重試處理,可重新發送這條訊息或持久化這條資料進行補償處理。
                System.out.println(new Date() + " Send mq message failed.");
                e.printStackTrace();
            }
        }

        // 在應用退出前,銷毀Producer對象。
        // 注意:銷毀Producer對象可以節約系統記憶體,若您需要頻繁發送訊息,則無需銷毀Producer對象。
        producer.shutdown();
    }
}

訂閱定時/延時訊息

訂閱定時/延時訊息的範例程式碼和訂閱普通訊息相同,請參見訂閱普通訊息

事務訊息範例程式碼

發送事務訊息

  1. 發送半事務訊息(Half Message)及執行本地事務,範例程式碼如下。

    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.AccessChannel;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.RPCHook;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    public class RocketMQTransactionProducer {
    
        private static RPCHook getAclRPCHook() {
            /**
             * 如果是使用公網存取點訪問,則必須設定RpcHook,裡面填寫執行個體的使用者名稱和密碼。執行個體使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
             * 注意!!!這裡填寫的不是阿里雲帳號的AccessKey ID和AccessKey Secret,請務必區分開。
             * 如果是在阿里雲ECS內網訪問,無需初始化RpcHook,服務端會根據內網VPC資訊智能擷取。
             * 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
             */
            return new AclClientRPCHook(new SessionCredentials("INSTANCE USER NAME", "INSTANCE PASSWORD"));
        }
    
        public static void main(String[] args) throws MQClientException {
            // 使用公網存取點時,需要配置RPCHook。
            // 您在訊息佇列RocketMQ版控制台建立的Group ID。注意:事務訊息的Group ID不能與其他類型訊息的Group ID共用。
            TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID", getAclRPCHook());
            // 使用VPC存取點時,無需配置RPCHook。
            // 如果執行個體類型為Serverless執行個體,則必須配置RPCHook。
            // TransactionMQProducer transactionMQProducer = new TransactionMQProducer("YOUR TRANSACTION GROUP ID");
    
            // 設定接入方式為阿里雲,在使用雲上訊息軌跡的時候,需要設定此項;如果不開啟訊息軌跡功能,則不需要運行此項。
            transactionMQProducer.setAccessChannel(AccessChannel.CLOUD);
    
            // 設定為您從阿里雲訊息佇列RocketMQ版控制台擷取的存取點資訊,類似“rmq-cn-XXXX.rmq.aliyuncs.com:8080”。
            // 注意!!!直接填寫控制台提供的網域名稱和連接埠即可,請勿添加http://或https://首碼標識,也不要用IP解析地址。
            transactionMQProducer.setNamesrvAddr("YOUR ACCESS POINT");
            transactionMQProducer.setTransactionCheckListener(new LocalTransactionCheckerImpl());
            transactionMQProducer.start();
    
            for (int i = 0; i < 10; i++) {
                try {
                    Message message = new Message("YOUR TRANSACTION TOPIC",
                            "YOUR MESSAGE TAG",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = transactionMQProducer.sendMessageInTransaction(message, new LocalTransactionExecuter() {
                        @Override
                        public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                            System.out.println("開始執行本地事務: " + msg);
                            return LocalTransactionState.UNKNOW;
                        }
                    }, null);
                    assert sendResult != null;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
  2. 提交事務訊息狀態,範例程式碼如下。

    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionCheckListener;
    import org.apache.rocketmq.common.message.MessageExt;
    
    // 雲訊息佇列 RocketMQ 版發送事務訊息本地Check介面實作類別。
    public class LocalTransactionCheckerImpl implements TransactionCheckListener {
        // 本地事務Checker。更多資訊,請參見事務訊息。
        @Override public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
            System.out.println("收到事務訊息的回查請求, MsgId: " + msg.getMsgId());
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    
    }

訂閱事務訊息

訂閱事務訊息的範例程式碼和訂閱普通訊息一樣,請參見訂閱普通訊息