すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:通常のメッセージ

最終更新日:Aug 15, 2024

通常のメッセージは、ApsaraMQ for RocketMQに特別な機能がないメッセージです。 通常のメッセージは、順序付けられたメッセージ、スケジュールされたメッセージ、遅延されたメッセージ、トランザクションメッセージなどの機能付きメッセージとは異なります。 このトピックでは、通常のメッセージのシナリオ、動作メカニズム、使用方法、および使用方法について説明します。

シナリオ

ほとんどの場合、通常のメッセージは、マイクロサービスのデカップリング、データ統合、およびイベント駆動のシナリオで使用されます。 ほとんどのシナリオは、送信の信頼性に関する高い要件と、メッセージ処理のタイミングおよびシーケンスに関する一般的な要件とを有する。

シナリオ1: マイクロサービスの非同期デカップリング

在线消息处理

上の図は、オンラインeコマース取引のシナリオを示しています。 このシナリオでは、上流の注文システムは、注文の配置と支払いを独立した通常のメッセージとしてカプセル化し、そのメッセージをApsaraMQ for RocketMQブローカーに送信します。 次に、ダウンストリームシステムは、要求に応じてブローカからのメッセージをサブスクライブし、ローカル消費ロジックに基づいてタスクを処理する。 メッセージは、互いに独立しており、関連付けられる必要はない。

シナリオ2: データの統合と送信

数据传输

上の図では、例としてオフラインログ収集を使用しています。 インストルメンテーションコンポーネントは、フロントエンドアプリケーションから操作ログを収集し、そのログをApsaraMQ for RocketMQに転送するために使用されます。 各メッセージは、ApsaraMQ for RocketMQによる処理を必要としないログデータです。 ApsaraMQ for RocketMQは、ログデータをダウンストリームストレージシステムに送信するだけで済みます。 後続のタスクはバックエンドアプリケーションによって処理されます。

働くメカニズム

通常のメッセージとは何ですか?

通常のメッセージは、ApsaraMQ for RocketMQの基本的なメッセージング機能を備えたメッセージです。 通常のメッセージは、プロデューサーとコンシューマー間の非同期の分離通信をサポートします。

生命周期

通常のメッセージのライフサイクル

  • 初期化

    メッセージは作成者によって作成され初期化され、ブローカーに送信する準備が整います。

  • 保留中の消費

    メッセージはブローカーに送信され、消費者に表示され、利用可能です。

  • 消費されている

    メッセージは、消費者によって取得され、消費者のローカルビジネスロジックに基づいて処理される。

    このプロセスでは、ブローカーは、消費者が消費結果を返すのを待つ。 特定の期間内にコンシューマーから応答が受信されない場合、ApsaraMQ for RocketMQはメッセージに対して再試行を実行します。 詳細は、「消費の再試行」をご参照ください。

  • 消費コミットメント

    消費者は消費を完了し、消費結果をブローカーにコミットする。 ブローカーは、現在のメッセージが消費されるかどうかをマークします。

    デフォルトでは、ApsaraMQ for RocketMQはすべてのメッセージを保持します。 消費結果がコミットされると、メッセージはすぐに削除されるのではなく、消費済みとしてマークされます。 メッセージは、保存期間が終了した場合、またはシステムのストレージ容量が不足している場合にのみ削除されます。 メッセージが削除される前に、コンシューマはメッセージを再消費することができる。

  • メッセージ削除

    メッセージの保存期間が満了した場合、またはストレージ容量が不足している場合、ApsaraMQ for RocketMQは、物理ファイルから最も早く保存されたメッセージをローリング方式で削除します。 詳細については、「メッセージの保存とクリーンアップ」をご参照ください。

制限事項

通常のメッセージは、MessageTypeNormalに設定されているトピックのみをサポートします。

メッセージキーとタグを指定して、通常のメッセージをフィルタリングまたは検索できます。 次のコードは、Javaで通常のメッセージを送受信する方法の例を示しています。

完全なサンプルコードの詳細については、「Apache RocketMQ 5.x SDK (推奨) 」をご参照ください。

サンプルコード

通常のメッセージを送信する

package doc;

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;


public class ProducerExample {
    public static void main(String[] args) throws ClientException {
        /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an Elastic Compute Service (ECS) instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the virtual private cloud (VPC) endpoint. 
         * If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // The name of the topic to which the message is sent. Before you use a topic to receive a message, you must create the topic in the ApsaraMQ for RocketMQ console. Otherwise, an error is returned. 
        String topic = "Your Topic";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));
        ClientConfiguration configuration = builder.build();
        /**
         * When you initialize a producer, you can specify the topics that you want to use to check whether the topic settings are valid and prevent invalid topics from being started. 
         * You do not need to specify the topics for non-transactional messages. The broker dynamically checks whether the topics are valid. 
         * Note: To prevent the API operation that is called to query transactional messages from failing, you must specify topics for transactional messages in advance. 
         */
        Producer producer = provider.newProducerBuilder()
                .setTopics(topic)
                .setClientConfiguration(configuration)
                .build();
        // Send a normal message. 
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                // The message key. You can use a keyword to accurately find the message. 
                .setKeys("messageKey")
                // The message tag. The consumer can use the tag to filter messages. 
                .setTag("messageTag")
                // The message body. 
                .setBody("messageBody".getBytes())
                .build();
        try {
            // Send the message. Take note of the result and capture exceptions such as failures. 
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }
    }
}

プッシュモードで通常のメッセージを使用する

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collections;

public class PushConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushConsumerExample.class);

    private PushConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
        /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the VPC endpoint. 
         * If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // Specify the topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String topic = "Your Topic";
        // Specify the consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();
        // The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to. 
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // Initialize a push consumer. When you initialize the push consumer, you must specify the consumer group, communication parameters, and subscription for the consumer. 
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // Specify the consumer group. 
                .setConsumerGroup(consumerGroup)
                // Specify the subscription. 
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                // Specify the message listener. 
                .setMessageListener(messageView -> {
                    // Consume the messages and return the consumption result. 
                    // LOGGER.info("Consume message={}", messageView);
                    System.out.println("Consume Message: " + messageView);
                    return ConsumeResult.SUCCESS;
                })
                .build();
        Thread.sleep(Long.MAX_VALUE);
        // If you no longer require the push consumer, shut down the process. 
        //pushConsumer.close();
    }
}                                                 

通常のメッセージをシンプルモードで消費する

import org.apache.rocketmq.client.apis.*;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.shaded.org.slf4j.Logger;
import org.apache.rocketmq.shaded.org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

public class SimpleConsumerExample {
    private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);

    private SimpleConsumerExample() {
    }

    public static void main(String[] args) throws ClientException, IOException {
        /**
         * The endpoint of the instance. You can view the endpoint on the Endpoints tab of the Instance Details page in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, we recommend that you specify the VPC endpoint. 
         * If you access the instance over the Internet or from a data center, you can specify the public endpoint. If you access the instance over the Internet, you must enable the Internet access feature for the instance. 
         */
        String endpoints = "rmq-cn-xxx.{regionId}.rmq.aliyuncs.com:8080";
        // Specify the topic to which you want to subscribe. Before you specify a topic, you must create the topic in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String topic = "Your Topic";
        // Specify the consumer group to which the consumer belongs. Before you specify a consumer group, you must create the consumer group in the ApsaraMQ for RocketMQ console in advance. Otherwise, an error is returned. 
        String consumerGroup = "Your ConsumerGroup";
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoints);
        /**
         * If you access the instance by using the public endpoint, you must specify the username and password of the instance. You can obtain the username and password on the Intelligent Authentication tab of the Access Control page corresponding to the instance in the ApsaraMQ for RocketMQ console. 
         * If the client of the ApsaraMQ for RocketMQ instance is deployed on an ECS instance and you want to access the ApsaraMQ for RocketMQ instance in an internal network, you do not need to specify the username or password. The broker automatically obtains the username and password based on the VPC information. 
         * If the instance is a serverless ApsaraMQ for RocketMQ instance, you must specify the username and password to access the instance over the Internet. If you enable the authentication-free in VPCs feature for the serverless instance and access the instance in a VPC, you do not need to specify the username or password. 
         */
        //builder.setCredentialProvider(new StaticSessionCredentialsProvider("Instance UserName", "Instance Password"));        
        ClientConfiguration clientConfiguration = builder.build();

        Duration awaitDuration = Duration.ofSeconds(10);
        // The rule that is used to filter messages. In the following example, all messages in the topic are subscribed to. 
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        // Initialize a simple consumer. When you initialize the simple consumer, you must specify the consumer group, communication parameters, and subscription for the consumer. 
        SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
                .setClientConfiguration(clientConfiguration)
                // Specify the consumer group. 
                .setConsumerGroup(consumerGroup)
                // Specify the timeout period for long polling requests. 
                .setAwaitDuration(awaitDuration)
                // Specify the subscription. 
                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                .build();
        // Specify the maximum number of messages to be pulled. 
        int maxMessageNum = 16;
        // Specify the invisible time of the messages. 
        Duration invisibleDuration = Duration.ofSeconds(10);
        // If you use a simple consumer to consume messages, the client must obtain and consume messages in a loop. 
        // To consume messages in real time, we recommend that you use multiple threads to concurrently pull messages. 
        while (true) {
            final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
            messages.forEach(messageView -> {
                // LOGGER.info("Received message: {}", messageView);
                System.out.println("Received message: " + messageView);
            });
            for (MessageView message : messages) {
                final MessageId messageId = message.getMessageId();
                try {
                    // After consumption is complete, the consumer must call the ACK method to commit the consumption result to the broker. 
                    consumer.ack(message);
                    System.out.println("Message is acknowledged successfully, messageId= " + messageId);
                    //LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
                } catch (Throwable t) {
                    t.printStackTrace();
                    //LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
                }
            }
        }
        // If you no longer require the simple consumer, shut down the process. 
        // consumer.close();
    }
}                                           

使用上の注意

トラブルシューティングを容易にするために、メッセージごとにグローバルに一意のキーを設定する

ApsaraMQ for RocketMQでは、カスタムメッセージキーを指定できます。 メッセージのキーを使用して、メッセージとそのトレースを効率的にクエリできます。

そのため、メッセージを送信する際には、注文IDやユーザーIDなど、ビジネスに関する一意の情報をキーとして使用することを推奨します。 これにより、後続のクエリでメッセージをすばやく見つけることができます。