全部產品
Search
文件中心

ApsaraMQ for RocketMQ:訂閱訊息

更新時間:Jul 01, 2024

本文介紹如何通過雲訊息佇列 RocketMQ 版的Java SDK訂閱訊息。

訂閱者式

雲訊息佇列 RocketMQ 版支援以下兩種訂閱者式:

  • 叢集訂閱

    同一個Group ID所標識的所有Consumer平均分攤消費訊息。例如某個Topic有9條訊息,一個Group ID有3個Consumer執行個體,那麼在叢集消費模式下每個執行個體平均分攤,只消費其中的3條訊息。設定方式如下所示。

    // 叢集訂閱者式設定(不設定的情況下,預設為叢集訂閱者式)。
    properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
  • 廣播訂閱

    同一個Group ID所標識的所有Consumer都會各自消費某條訊息一次。例如某個Topic有9條訊息,一個Group ID有3個Consumer執行個體,那麼在廣播消費模式下每個執行個體都會各自消費9條訊息。設定方式如下所示。

    // 廣播訂閱者式設定。
    properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);               
說明
  • 請確保同一個Group ID下所有Consumer執行個體的訂閱關係保持一致,更多資訊,請參見訂閱關係一致

  • 兩種不同的訂閱者式有著不同的功能限制,例如,廣播模式不支援順序訊息、不維護消費進度、不支援重設消費位點等,更多資訊,請參見叢集消費和廣播消費

訊息擷取方式

雲訊息佇列 RocketMQ 版支援以下兩種訊息擷取方式:

  • Push:訊息由雲訊息佇列 RocketMQ 版推送至Consumer。Push方式下,雲訊息佇列 RocketMQ 版還支援批量消費功能,可以將批量訊息統一推送至Consumer進行消費。更多資訊,請參見批量消費

  • Pull:訊息由Consumer主動從雲訊息佇列 RocketMQ 版拉取。

Pull Consumer提供了更多接收訊息的選擇。相比於Push Consumer,您可以使用Pull Consumer更加自由地控制訊息拉取。具體的介面資訊請參見介面和參數說明

重要
  • 如需使用Pull Consumer,請確保您的執行個體為企業鉑金版。

  • Pull Consumer不支援公網訪問,僅支援VPC網路訪問。

範例程式碼

具體的範例程式碼,請以雲訊息佇列 RocketMQ 版程式碼程式庫為準。以下分別列舉Push和Pull兩種訊息擷取方式的範例程式碼。

  • Push方式

    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Consumer;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
    import java.util.Properties;
    
    public class ConsumerTest {
       public static void main(String[] args) {
           Properties properties = new Properties();
           // 您在訊息佇列RocketMQ版控制台建立的Group ID。
           properties.put(PropertyKeyConst.GROUP_ID, "XXX");
           // 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
           // AccessKey ID,阿里雲身分識別驗證標識。
           properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
           // AccessKey Secret,阿里雲身分識別驗證密鑰。
           properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
           // 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
           properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
              // 叢集訂閱者式(預設)。
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
              // 廣播訂閱者式。
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
    
           Consumer consumer = ONSFactory.createConsumer(properties);
            //訂閱多個Tag。
           consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { 
               public Action consume(Message message, ConsumeContext context) {
                   System.out.println("Receive: " + message);
                   return Action.CommitMessage;
               }
           });
    
            // 訂閱另外一個Topic,如需取消訂閱該Topic,請刪除該部分的訂閱代碼,重新啟動消費端即可。
            //訂閱全部Tag。
            consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { 
               public Action consume(Message message, ConsumeContext context) {
                   System.out.println("Receive: " + message);
                   return Action.CommitMessage;
               }
           });
    
           consumer.start();
           System.out.println("Consumer Started");
       }
    }            
  • Push方式(批量消費)

    重要

    配置雲訊息佇列 RocketMQ 版的批量消費功能,請升級TCP Java SDK到1.8.7.3或以上版本,詳細版本說明和擷取方式,請參見Java SDK版本說明

    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.batch.BatchConsumer;
    import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
    import java.util.List;
    import java.util.Properties;
    
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.aliyun.openservices.tcp.example.MqConfig;
    
    public class SimpleBatchConsumer {
    
        public static void main(String[] args) {
            Properties consumerProperties = new Properties();
            // 您在訊息佇列RocketMQ版控制台建立的Group ID。
            consumerProperties.setProperty(PropertyKeyConst.GROUP_ID, MqConfig.GROUP_ID);
            // 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
            // AccessKey ID,阿里雲身分識別驗證標識。
            properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
            // AccessKey Secret,阿里雲身分識別驗證密鑰。
            properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            // 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
            consumerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, MqConfig.NAMESRV_ADDR);
    
            // 設定批量消費最大訊息數量,當指定Topic的訊息數量已經攢夠128條,SDK立即執行回調進行消費。預設值:32,取值範圍:1~1024。
            consumerProperties.setProperty(PropertyKeyConst.ConsumeMessageBatchMaxSize, String.valueOf(128));
            // 設定批量消費最大等待時間長度,當等待時間達到10秒,SDK立即執行回調進行消費。預設值:0,取值範圍:0~450,單位:秒。
            consumerProperties.setProperty(PropertyKeyConst.BatchConsumeMaxAwaitDurationInSeconds, String.valueOf(10));
    
            BatchConsumer batchConsumer = ONSFactory.createBatchConsumer(consumerProperties);
            batchConsumer.subscribe(MqConfig.TOPIC, MqConfig.TAG, new BatchMessageListener() {
    
                 @Override
                public Action consume(final List<Message> messages, ConsumeContext context) {
                    System.out.printf("Batch-size: %d\n", messages.size());
                    // 批量訊息處理。
                    return Action.CommitMessage;
                }
            });
            //啟動batchConsumer。
            batchConsumer.start();
            System.out.println("Consumer start success.");
    
            //等待固定時間防止進程退出。
            try {
                Thread.sleep(200000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }         
  • Pull方式

    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.aliyun.openservices.ons.api.PullConsumer;
    import com.aliyun.openservices.ons.api.TopicPartition;
    import java.util.List;
    import java.util.Properties;
    import java.util.Set;
    
    public class PullConsumerClient {
        public static void main(String[] args){
            Properties properties = new Properties();
            // 您在訊息佇列RocketMQ版控制台建立的Group ID。
            properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
            // 請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
            // AccessKey ID,阿里雲身分識別驗證標識。
            properties.put(PropertyKeyConst.AccessKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
            // AccessKey Secret,阿里雲身分識別驗證密鑰。
            properties.put(PropertyKeyConst.SecretKey, System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            // 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "xxxxx");
            PullConsumer consumer = ONSFactory.createPullConsumer(properties);
            // 啟動Consumer。
            consumer.start();
            // 擷取topic-xxx下的所有分區。
            Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx");
            // 指定需要拉取訊息的分區。
            consumer.assign(topicPartitions);
    
            while (true) {
                // 拉取訊息,逾時時間為3000 ms。
                List<Message> messages = consumer.poll(3000);
                System.out.printf("Received message: %s %n", messages);
            }
        }
    }

    分區和位點的詳細說明,請參見基本概念

更多資訊

雲訊息佇列 RocketMQ 版消費端流控的最佳實務,請參見訊息佇列RocketMQ用戶端流控設計