すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:ステップ3: SDKを使用してメッセージを送受信する

最終更新日:Aug 15, 2024

ApsaraMQ for RocketMQは、異なるタイプのメッセージを送受信するための複数のプログラミング言語用のSDKを提供します。 このトピックでは、SDK for Javaを使用してApsaraMQ for RocketMQブローカーに接続し、通常のメッセージを送受信する方法について説明します。

前提条件

  • 必要なリソースはApsaraMQ for RocketMQで作成されます。 詳細については、「手順2: リソースの作成」をご参照ください。

  • IntelliJ IDEAがインストールされています。 詳細については、「IntelliJ IDEA」をご参照ください。

    IntelliJ IDEAまたはEclipseを使用できます。 このトピックの例では、IntelliJ IDEA Ultimateが使用されています。

  • JDK 1.8以降がインストールされます。 詳細については、「Javaダウンロード」をご参照ください。

  • Maven 2.5以降がインストールされています。 詳細については、「Apache Mavenのダウンロード」をご参照ください。

Java依存ライブラリのインストール

  1. IntelliJ IDEAでJavaプロジェクトを作成します。

  2. 次の依存関係をpom.xmlファイルに追加して、Java依存関係ライブラリをインポートします。

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client-java</artifactId>
        <version>5.0.7</version>
    </dependency>
    重要

    serverless ApsaraMQ for RocketMQインスタンスを使用している場合は、インターネット経由でインスタンスにアクセスするときに、SDK for Javaのバージョンに注意してください。 インターネット経由でサーバーレスApsaraMQ for RocketMQインスタンスにアクセスできるのは、特定のバージョンのSDK for Javaのみです。 詳細については、「インターネット経由でサーバーレスインスタンスにアクセスするためのバージョンの説明」をご参照ください。

メッセージを生成する

作成したJavaプロジェクトで、通常のメッセージを送信するプログラムを作成して実行します。 サンプルコード:

package doc;

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;


public class ProducerExample {
    public static void main(String[] args) throws ClientException {
        /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the virtual private cloud (VPC) endpoint. 
         * If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // The name of the topic to which the message is sent. Before you use a topic to receive a message, you must create the topic in the ApsaraMQ for RocketMQ console. Otherwise, an error is returned. 
        String topic = "Your Topic";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
        ClientConfiguration configuration = builder.build();
        /**
         * When you initialize a producer, you can specify the topics that you want to use to check whether the topic settings are valid and prevent invalid topics from being started. 
         * You do not need to specify the topics for non-transactional messages. The broker dynamically checks whether the topics are valid. 
         * Note: To prevent the API operation that is called to query transactional messages from failing, you must specify topics for transactional messages in advance. 
         */
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
        // Send a normal message. 
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // The message key. You can use a keyword to accurately find the message. 
                .setKeys("messageKey")
                // The message tag. The consumer can use the tag to filter messages. 
                .setTag("messageTag")
                // The message body. 
                .setBody("messageBody".getBytes())
                .build();
        try {
            // Send the message. Take note of the result and capture exceptions such as failures. 
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }
    }
}

メッセージを消費する

作成したJavaプロジェクトで、通常のメッセージをサブスクライブするプログラムを作成して実行します。 ApsaraMQ for RocketMQでは、シンプルモードとプッシュモードでメッセージを使用できます。 詳細については、「SimpleConsumer」および「PushConsumer」をご参照ください。 メッセージをサブスクライブするモードの1つを選択できます。 シンプルコンシューマーとプッシュコンシューマーの違いを次の表に示します。

項目

PushConsumer

SimpleConsumer

API操作呼び出し

コールバック操作は、メッセージリスナーを使用して消費結果を返すために呼び出されます。 コンシューマーは、メッセージリスナーのスコープ内でのみ消費ロジックを処理できます。

ビジネスアプリケーションはメッセージ処理を実装し、対応する操作を呼び出して消費結果を返します。

消費並行性管理

ApsaraMQ forRocketMQ SDKは、メッセージ消費の同時スレッド数を管理するために使用されます。

メッセージ消費に使用される同時スレッドの数は、個々のビジネスアプリケーションの消費ロジックに基づいています。

APIの柔軟性

API操作はカプセル化され、柔軟性に乏しい。

アトミック操作は大きな柔軟性を提供します。

シナリオ

このコンシューマタイプは、カスタムプロセスを必要としない開発シナリオに適しています。

このコンシューマタイプは、カスタムプロセスが必要な開発シナリオに適しています。

PushConsumer

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;

public class PushConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the VPC endpoint. 
         * If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // Specify the topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String topic = "Your Topic";
        // Specify the consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();
        // The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to. 
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // Initialize a push consumer. When you initialize the push consumer, you must specify the consumer group, communication parameters, and subscription for the consumer. 
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // Specify the consumer group. 
                .setConsumerGroup(consumerGroup)
                // Specify the subscription. 
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // Specify the message listener. 
                .setMessageListener(messageView -> {
                    // Consume the messages and return the consumption result. 
                    // LOGGER.info("Consume message={}", messageView);
                    System.out.println("Consume Message: " + messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        // If you no longer require the push consumer, shut down the process. 
        //pushConsumer.close();
    }
}                                                 

SimpleConsumer

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

public class SimpleConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);

    private SimpleConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException {
        /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the VPC endpoint. 
         * If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // Specify the topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String topic = "Your Topic";
        // Specify the consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();

        Duration awaitDuration = Duration.ofSeconds(10);
        // The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to. 
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // Initialize a simple consumer. When you initialize the simple consumer, you must specify the consumer group, communication parameters, and subscription for the consumer. 
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // Specify the consumer group. 
                .setConsumerGroup(consumerGroup)
                // Specify the timeout period for long polling requests. 
                .setAwaitDuration(awaitDuration)
                // Specify the subscription. 
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();
        // Specify the maximum number of messages to be pulled. 
        int maxMessageNum = 16;
        // Specify the invisible time of the messages. 
        Duration invisibleDuration = Duration.ofSeconds(10);
        // If you use a simple consumer to consume messages, the client must obtain and consume messages in a loop. 
        // To consume messages in real time, we recommend that you use multiple threads to concurrently pull messages. 
        while (true) {
            final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
            messages.forEach(messageView -> {
                // LOGGER.info("Received message: {}", messageView);
                System.out.println("Received message: " + messageView);
            });
            for (MessageView message : messages) {
                final MessageId messageId = message.getMessageId();
                try {
                    // After consumption is complete, the consumer must call the ACK method to commit the consumption result to the broker. 
                    consumer.ack(message);
                    System.out.println("Message is acknowledged successfully, messageId= " + messageId);
                    //LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
                } catch (Throwable t) {
                    t.printStackTrace();
                    //LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                }
            }
        }
        // If you no longer require the simple consumer, shut down the process. 
        // consumer.close();
    }
}                                           

インターネット経由でのサーバーレスインスタンスへのアクセスに関するバージョンの説明

インターネット経由でサーバーレスApsaraMQ for RocketMQインスタンスにアクセスできるのは、特定のバージョンのSDKのみです。

SDK for Java 5.x

インターネット経由でサーバーレスのApsaraMQ for RocketMQインスタンスにアクセスしてメッセージを送受信する場合は、コードに次の情報を追加する必要があります。

説明

InstanceIdをApsaraMQ for RocketMQインスタンスのIDに置き換えます。

  • バージョンが5.2.0以降のJava用SDK

    メッセージを送信するときに、コードに次の情報を追加します。producer.setNamespaceV2("InstanceId");

    メッセージを受信したときに、コードに次の情報を追加します。consumer.setNamespaceV2("InstanceId");

  • バージョンが5.0.6以降で5.2.0より前のJava用SDK

    メッセージを送受信するときに、コードに次の情報を追加します。

    ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
    .setEndpoints(endpoints)
    .setNamespace("InstanceId")
    .setCredentialProvider(sessionCredentialsProvider)
    .build();
  • バージョンが5.0.6より前のSDK for Java: インターネット経由でサーバーレスApsaraMQ for RocketMQインスタンスにアクセスすることはできません。

TCPクライアントSDK for Java 1.x

インターネット経由でサーバーレスのApsaraMQ for RocketMQインスタンスにアクセスしてメッセージを送受信する場合は、RocketMQ 1.x TCPクライアントSDK for Javaのバージョンが1.9.0であることを確認する必要があります。

properties.setProperty(PropertyKeyConst.Namespace, "InstanceId");

説明

InstanceIdをApsaraMQ for RocketMQインスタンスのIDに置き換えます。

SDKパラメーター

パラメーター

説明

エンドポイント

rmq-cn-xxx.{regionI d}.rmq.aliyuncs.com:8080

ApsaraMQ for RocketMQインスタンスのエンドポイント。 エンドポイントの取得方法については、「インスタンスのエンドポイントの取得」をご参照ください。

  • インターネット経由でインスタンスにアクセスする場合は、パブリックエンドポイントを指定します。

  • VPC内のインスタンスにアクセスする場合は、VPCエンドポイントを指定します。

topic

normal_test

ApsaraMQ for RocketMQインスタンスでメッセージが送信されるトピックまたはメッセージが使用されるトピック。

事前にApsaraMQ for RocketMQインスタンスにトピックを作成する必要があります。 詳細については、「トピックの作成」をご参照ください。

group

GID_test

コンシューマがApsaraMQ for RocketMQインスタンスのメッセージを消費するために使用するコンシューマグループ。

事前にApsaraMQ for RocketMQインスタンスにコンシューマーグループを作成する必要があります。 詳細については、「コンシューマーグループの作成」をご参照ください。

インスタンスのユーザー名

1XVg0hzgKm ******

ApsaraMQ for RocketMQインスタンスのユーザー名。 インターネット経由でインスタンスにアクセスする場合は、ユーザー名を指定する必要があります。 VPCでインスタンスにアクセスする場合、インスタンスがサーバーレスインスタンスであり、VPCでの認証不要機能が無効になっている場合にのみ、ユーザー名を指定する必要があります。

ユーザー名の取得方法については、「インスタンスのユーザー名とパスワードの取得」をご参照ください。

インスタンスパスワード

ijSt8rEc45 ******

ApsaraMQ for RocketMQインスタンスのパスワード。 インターネット経由でインスタンスにアクセスする場合は、パスワードを指定する必要があります。 VPCでインスタンスにアクセスする場合、インスタンスがサーバーレスインスタンスであり、VPCでの認証不要機能が無効になっている場合にのみ、パスワードを指定する必要があります。

パスワードの取得方法については、「インスタンスのユーザー名とパスワードの取得」をご参照ください。

メッセージ消費の確認

メッセージを消費した後、ApsaraMQ for RocketMQコンソールでメッセージの消費ステータスを確認できます。

  1. ApsaraMQ for RocketMQコンソールにログインします。 インスタンス数 ページで、管理するインスタンスの名前をクリックします。

  2. 表示されるページの左側のナビゲーションウィンドウで、メッセージトレース.

SDK参照

他のプログラミング言語のSDKを使用して他の種類のメッセージを送受信する方法については、「概要」をご参照ください。