全部產品
Search
文件中心

ApsaraMQ for RocketMQ:步驟三:調用SDK收發訊息

更新時間:Jul 23, 2024

雲訊息佇列 RocketMQ 版提供多種語言的SDK用於收發不同類型的訊息,本文以Java SDK為例,說明如何調用SDK串連雲訊息佇列 RocketMQ 版服務端,完成普通訊息的收發流程。

前提條件

安裝Java依賴庫

  1. 在IDEA中建立一個Java工程。

  2. pom.xml檔案中添加以下依賴引入Java依賴庫。

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client-java</artifactId>
        <version>5.0.7</version>
    </dependency>
    重要

    如果您使用的執行個體類型為Serverless,在公網訪問的時候需要注意SDK的版本等資訊,詳情請參見Serverless版執行個體公網訪問版本說明

生產訊息

在已建立的Java工程中,建立發送普通訊息程式並運行,範例程式碼如下:

package doc;

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;


public class ProducerExample {
    public static void main(String[] args) throws ClientException {
        /**
         * 執行個體存取點,從控制台執行個體詳情頁的存取點頁簽中擷取。
         * 如果是在阿里雲ECS內網訪問,建議填寫VPC存取點。
         * 如果是在本地公網訪問,或者是線下IDC環境訪問,可以使用公網存取點。使用公網存取點訪問,必須開啟執行個體的公網訪問功能。
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        //訊息發送的目標Topic名稱,需要提前在控制台建立,如果不建立直接使用會返回報錯。
        String topic = "Your Topic";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * 如果是使用公網存取點訪問,configuration還需要設定執行個體的使用者名稱和密碼。使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
         * 如果是在阿里雲ECS內網訪問,無需填寫該配置,服務端會根據內網VPC資訊智能擷取。
         * 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
        ClientConfiguration configuration = builder.build();
        /**
         * 初始化Producer時直接配置需要使用的Topic列表(這個參數可以配置多個Topic),實現提前檢查錯誤配置、攔截非法配置啟動。
         * 針對非事務訊息 Topic,也可以不配置,服務端會動態檢查訊息的Topic是否合法。
         * 注意!!!事務訊息Topic必須提前配置,以免事務訊息回查介面失敗,具體原理請參見事務訊息。
         */
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
        //普通訊息發送。
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                //設定訊息索引鍵,可根據關鍵字精確尋找某條訊息。
                .setKeys("messageKey")
                //設定訊息Tag,用於消費端根據指定Tag過濾訊息。
                .setTag("messageTag")
                //訊息體。
                .setBody("messageBody".getBytes())
                .build();
        try {
            //發送訊息,需要關注發送結果,並捕獲失敗等異常。
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }
    }
}

消費訊息

在已建立的Java工程中,建立訂閱普通訊息程式並運行。雲訊息佇列 RocketMQ 版支援SimpleConsumerPushConsumer兩種消費者類型,您可以選擇任意一種方式訂閱訊息,具體的消費者類型的差異如下:

對比項

PushConsumer

SimpleConsumer

介面方式

使用監聽器回調介面返回消費結果,消費者僅允許在監聽器範圍內處理消費邏輯。

業務方自行實現訊息處理,並主動調用介面返回消費結果。

消費並發度管理

由SDK管理消費並發度。

由業務方消費邏輯自行管理消費線程。

介面靈活度

高度封裝,不夠靈活。

原子介面,可靈活自訂。

適用情境

適用於無自訂流程的開發情境。

適用於需要高度自訂商務程序的開發情境。

PushConsumer

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;

public class PushConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        /**
         * 執行個體存取點,從控制台執行個體詳情頁的存取點頁簽中擷取。
         * 如果是在阿里雲ECS內網訪問,建議填寫VPC存取點。
         * 如果是在本地公網訪問,或者是線下IDC環境訪問,可以使用公網存取點。使用公網存取點訪問,必須開啟執行個體的公網訪問功能。
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        //指定需要訂閱哪個目標Topic,Topic需要提前在控制台建立,如果不建立直接使用會返回報錯。
        String topic = "Your Topic";
        //為消費者指定所屬的消費者分組,Group需要提前在控制台建立,如果不建立直接使用會返回報錯。
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * 如果是使用公網存取點訪問,configuration還需要設定執行個體的使用者名稱和密碼。使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
         * 如果是在阿里雲ECS內網訪問,無需填寫該配置,服務端會根據內網VPC資訊智能擷取。
         * 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();
        //訂閱訊息的過濾規則,表示訂閱所有Tag的訊息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        //初始化PushConsumer,需要綁定消費者分組ConsumerGroup、通訊參數以及訂閱關係。
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                //設定消費者分組。
                .setConsumerGroup(consumerGroup)
                //設定預綁定的訂閱關係。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                //設定消費監聽器。
                .setMessageListener(messageView -> {
                    //處理訊息並返回消費結果。
                    // LOGGER.info("Consume message={}", messageView);
                    System.out.println("Consume Message: " + messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        //如果不需要再使用PushConsumer,可關閉該進程。
        //pushConsumer.close();
    }
}                                                 

SimpleConsumer

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

public class SimpleConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);

    private SimpleConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException {
        /**
         * 執行個體存取點,從控制台執行個體詳情頁的存取點頁簽中擷取。
         * 如果是在阿里雲ECS內網訪問,建議填寫VPC存取點。
         * 如果是在本地公網訪問,或者是線下IDC環境訪問,可以使用公網存取點。使用公網存取點訪問,必須開啟執行個體的公網訪問功能。
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        //指定需要訂閱哪個目標Topic,Topic需要提前在控制台建立,如果不建立直接使用會返回報錯。
        String topic = "Your Topic";
        //為消費者指定所屬的消費者分組,Group需要提前在控制台建立,如果不建立直接使用會返回報錯。
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * 如果是使用公網存取點訪問,configuration還需要設定執行個體的使用者名稱和密碼。使用者名稱和密碼在控制台存取控制的智能身份識別頁簽中擷取。
         * 如果是在阿里雲ECS內網訪問,無需填寫該配置,服務端會根據內網VPC資訊智能擷取。
         * 如果執行個體類型為Serverless執行個體,公網訪問必須設定執行個體的使用者名稱密碼,當開啟內網免身份識別時,內網訪問可以不設定使用者名稱和密碼。
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();

        Duration awaitDuration = Duration.ofSeconds(10);
        //訂閱訊息的過濾規則,表示訂閱所有Tag的訊息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        //初始化SimpleConsumer,需要綁定消費者分組ConsumerGroup、通訊參數以及訂閱關係。
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                //設定消費者分組。
                .setConsumerGroup(consumerGroup)
                //設定長輪詢逾時時間。
                .setAwaitDuration(awaitDuration)
                //設定預綁定的訂閱關係。
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();
        //設定本次拉取的最大訊息條數。
        int maxMessageNum = 16;
        //設定訊息的不可見時間。
        Duration invisibleDuration = Duration.ofSeconds(10);
        //SimpleConsumer需要用戶端一直主動迴圈擷取訊息,並進行消費處理。
        //如果需要提高消費即時性,建議多線程並發拉取。
        while (true) {
            final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
            messages.forEach(messageView -> {
                // LOGGER.info("Received message: {}", messageView);
                System.out.println("Received message: " + messageView);
            });
            for (MessageView message : messages) {
                final MessageId messageId = message.getMessageId();
                try {
                    //消費處理完成後,需要主動調用ACK向服務端提交消費結果。
                    consumer.ack(message);
                    System.out.println("Message is acknowledged successfully, messageId= " + messageId);
                    //LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
                } catch (Throwable t) {
                    t.printStackTrace();
                    //LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                }
            }
        }
        // 如果不需要再使用SimpleConsumer,可關閉該進程。
        // consumer.close();
    }
}                                           

Serverless版執行個體公網訪問版本說明

Serverless版執行個體的公網訪問功能,僅部分版本的SDK用戶端支援,具體限制如下:

Java 5.x SDK

Serverless版執行個體使用公網訪問接入雲訊息佇列 RocketMQ 版時,需要保證使用的SDK版本滿足以下要求,並在訊息收發代碼中補充如下內容:

說明

其中,InstanceId需要替換為您實際使用的執行個體ID。

  • SDK版本:rocketmq-client ≥ 5.2.0

    訊息發送代碼補充:producer.setNamespaceV2("InstanceId");

    訊息消費代碼補充:consumer.setNamespaceV2("InstanceId");

  • SDK版本:rocketmq-client-java ≥ 5.0.6

    訊息發送和訊息消費代碼補充:

    ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
    .setEndpoints(endpoints)
    .setNamespace("InstanceId")
    .setCredentialProvider(sessionCredentialsProvider)
    .build();

Java Ons 1.x SDK

Serverless版執行個體使用公網訪問接入雲訊息佇列 RocketMQ 版時,需要保證使用的Java ONS 1.x SDK版本為1.9.0.Final及以上版本,並在訊息收發代碼中補充如下內容:

properties.setProperty(PropertyKeyConst.Namespace, "InstanceId");

說明

其中,InstanceId需要替換為您實際使用的執行個體ID。

SDK參數填寫說明

參數

樣本值

描述

endpoints

rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080

雲訊息佇列 RocketMQ 版執行個體的存取點。擷取方式,請參見擷取執行個體存取點

  • 使用公網訪問,請填寫公網存取點。

  • 使用內網訪問,請填寫VPC存取點。

topic

normal_test

雲訊息佇列 RocketMQ 版的Topic,用於指定生產者將訊息發送到哪個Topic,或者指定消費者要消費哪個Topic的訊息。

Topic需要提前在雲訊息佇列 RocketMQ 版執行個體下建立。具體操作,請參見建立Topic

group

GID_test

雲訊息佇列 RocketMQ 版的ConsumerGroup,用於指定消費者使用哪個消費者分組消費訊息。

Group需要提前在雲訊息佇列 RocketMQ 版執行個體下建立。具體操作,請參見建立ConsumerGroup

Instance UserName

1XVg0hzgKm******

雲訊息佇列 RocketMQ 版執行個體的使用者名稱。使用公網訪問時需要填寫,VPC訪問時Serverless執行個體只有開啟內網免身份識別可以不用填寫,其他執行個體無需填寫。

擷取方式,請參見擷取執行個體使用者名稱密碼

Instance Password

ijSt8rEc45******

雲訊息佇列 RocketMQ 版執行個體的使用者密碼。使用公網訪問時需要填寫,VPC訪問時Serverless執行個體只有開啟內網免身份識別可以不用填寫,其他執行個體無需填寫。

擷取方式,請參見擷取執行個體使用者名稱密碼

驗證訊息

訊息收發完成後,您可以通過控制台查看訊息消費情況。

  1. 登入控制台,在執行個體列表頁面單擊目標執行個體名稱。

  2. 在左側導覽列單擊訊息軌跡

SDK參考

本文以Java SDK為例介紹收發普通訊息流程程,其他語言SDK和其他類型訊息的範例程式碼,請參見SDK參考概述