すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:注文されたメッセージの送信と購読

最終更新日:Jul 09, 2024

注文されたメッセージは、先入れ先出し (FIFO) メッセージとも呼ばれ、ApsaraMQ for RocketMQによって提供されます。 注文されたメッセージは、厳密な順序で発行および使用されます。 このトピックでは、TCPクライアントSDK for Javaを使用して順序付きメッセージを送信およびサブスクライブする方法を示すサンプルコードを提供します。

前提条件

次の操作が完了しました。

  • Java 1.2.7以降のSDKがダウンロードされます。 SDK For Javaのリリースノートについては、「リリースノート」をご参照ください。

  • 環境がセットアップされる。 詳細については、「環境の準備」をご参照ください。

  • オプションです。 ロギング設定が設定されています。 詳細については、「ロギング設定」をご参照ください。

背景情報

順序付けられたメッセージは、次のタイプに分類されます。

  • グローバルに順序付けられたメッセージ: 指定されたトピックのすべてのメッセージは、先入れ先出し (FIFO) の順序で発行および使用されます。

  • パーティション順メッセージ: 指定されたトピックのすべてのメッセージは、シャーディングキーを使用して異なるパーティションに配布されます。 各パーティション内のメッセージは、FIFO順に発行され、消費される。 シャーディングキーは、異なるパーティションを識別するために順序付けられたメッセージに使用されるキーフィールドです。 シャーディングキーは、通常のメッセージのキーとは異なります。

詳細については、「注文メッセージ」をご参照ください。

説明

ApsaraMQ forRocketMQを初めて使用するときは、メッセージを送信してサブスクライブする前に、デモプロジェクトを参照してApsaraMQ for RocketMQプロジェクトを構築することを推奨します。

順序付けられたメッセージを送信する

重要

ApsaraMQ for RocketMQブローカーは、送信者が単一のプロデューサーまたはスレッドを使用してメッセージを送信する順序に基づいて、メッセージが生成される順序を決定します。 送信者が複数のプロデューサまたはスレッドを使用してメッセージを同時に送信する場合、メッセージの順序は、ApsaraMQ for RocketMQブローカーがメッセージを受信した順序によって決まります。 この注文は、ビジネス側の送信注文とは異なる場合があります。

サンプルコードの詳細については、「ApsaraMQ For RocketMQコードライブラリ」をご参照ください。

グローバルに順序付けられたメッセージおよび部分的に順序付けられたメッセージは、同じ方法で発行される。 次のサンプルコードを提供します。

package com.aliyun.openservices.ons.example.order;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;

import java.util.Properties;


public class ProducerClient {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        // An AccessKey ID is used as the identifier for identity authentication. 
        properties.put(PropertyKeyConst.AccessKey,"XXX");
        // An AccessKey secret is used as the password for identity authentication. 
        properties.put(PropertyKeyConst.SecretKey,"XXX");
        // The TCP endpoint of your instance. To obtain the TCP endpoint, log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page, scroll to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
        properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
        OrderProducer producer = ONSFactory.createOrderProducer(properties);
        // Before you send a message, call the start() method only once to start the producer. 
        producer.start();
        for (int i = 0; i < 1000; i++) {
            String orderId = "biz_" + i % 10;
            Message msg = new Message(
                    // The topic to which the message belongs. 
                    "Order_global_topic",
                    // The message tag. A message tag is similar to a Gmail tag and is used to sort and filter messages on the ApsaraMQ for RocketMQ broker. 
                    "TagA",
                    // The message body. A message body is data in binary format. ApsaraMQ for RocketMQ does not process the message body. The producer and the consumer must agree on the serialization and deserialization methods. 
                    "send order global msg".getBytes()
            );
            // The message key. A key is the business-specific attribute of a message and must be globally unique whenever possible. 
            // A key helps you query and resend a message in the ApsaraMQ for RocketMQ console if the message fails to be received. 
            // Note: You can send and receive messages even if you do not specify the message key. 
            msg.setKey(orderId);
            // The key field that is used in ordered messages to identify different partitions. A sharding Key is different from the key of a normal message. 
            // This field can be set to a non-empty string for globally ordered messages. 
            String shardingKey = String.valueOf(orderId);
            try {
                SendResult sendResult = producer.send(msg, shardingKey);
                // Send the message. If no exception is thrown, the message is sent. 
                if (sendResult != null) {
                    System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
                }
            }
            catch (Exception e) {
                // Specify the logic to resend or persist the message if the message fails to be sent and needs to be re-sent. 
                System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
                e.printStackTrace();
            }
        }
        // Before you exit the application, shut down the producer. 
        // Note: Memory can be saved if you shut down a producer. If you need to frequently send messages, do not need to shut down a producer. 
        producer.shutdown();
    }

}

注文されたメッセージを購読する

全体的に順序付けられたメッセージおよび部分的に順序付けられたメッセージは、同じ方法でサブスクライブされる。 次のサンプルコードを提供します。

package com.aliyun.openservices.ons.example.order;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;

import java.util.Properties;


public class ConsumerClient {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // The ID of the group that you created in the ApsaraMQ for RocketMQ console. 
        properties.put(PropertyKeyConst.GROUP_ID,"XXX");
        // An AccessKey ID is used as the identifier for identity authentication. 
        properties.put(PropertyKeyConst.AccessKey,"XXX");
        // An AccessKey secret is used as the password for identity authentication.
        properties.put(PropertyKeyConst.SecretKey,"XXX");
        // The TCP endpoint of your instance. To obtain the TCP endpoint, log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, click Instances. On the Instances page, click the name of your instance. On the Instance Details page, scroll to the Basic Information section and view the TCP endpoint on the Endpoints tab. 
        properties.put(PropertyKeyConst.NAMESRV_ADDR,"XXX");
          // The timeout period in milliseconds before a retry is performed when the system fails to consume an ordered message. Value range: 10 to 30,000. 
        properties.put(PropertyKeyConst.SuspendTimeMillis,"100");
        // The maximum number of retries for the message if the message fails to be consumed. 
        properties.put(PropertyKeyConst.MaxReconsumeTimes,"20");

        // Before you use the consumer to subscribe to a message, call the start() method only once to start the consumer. 
        OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);

        consumer.subscribe(
                // The topic to which the message belongs. 
                "Order_global_topic",
                // Subscribe to messages that contain specified tags in the specified topic.
                // 1. An asterisk (*) specifies that the consumer subscribes to all messages. 
                // 2. TagA || TagB || TagC specifies that the consumer subscribes to messages that contain TagA, TagB, or TagC. 
                "*",
                new MessageOrderListener() {
                    /**
                     * 1 If a message fails to be consumed or an exception occurs during message processing, OrderAction.Suspend is returned. 
                     * 2. If a message is processed, OrderAction.Success is returned. 
                     */
                    @Override
                    public OrderAction consume(Message message, ConsumeOrderContext context) {
                        System.out.println(message);
                        return OrderAction.Success;
                    }
                });

        consumer.start();
    }
}