
ApsaraMQ for RocketMQ:TCPクライアントSDKを使用して通常のメッセージを送信およびサブスクライブする

ApsaraMQ forRocketMQコンソールで必要なリソースを作成した後、ApsaraMQ for RocketMQ TCPクライアントSDKを使用して通常のメッセージを送信およびサブスクライブできます。


  • リソースの作成


    提供される例では、通常のメッセージが使用される。 通常のメッセージ用に作成したトピックを使用して、スケジュールされたメッセージ、遅延メッセージ、順序付けられたメッセージ、トランザクションメッセージなど、他の種類のメッセージを送信またはサブスクライブすることはできません。 メッセージのメッセージタイプに基づいてトピックを作成する必要があります。

  • AccessKeyペアの作成



商用SDKは、オープンソースSDKよりも多くの機能と高い安定性を提供します。 ApsaraMQ forRocketMQにアクセスするには、ApsaraMQ for RocketMQが提供する商用SDKを使用することを推奨します。 オープンソースSDKは、コードを変更せずにApache RocketMQクラスターからApsaraMQ for RocketMQインスタンスにデータを移行する場合にのみ使用できます。

ApsaraMQ for RocketMQは、以下の商用TCPクライアントSDKを提供します。 ビジネス要件に基づいて、特定のプログラミング言語用のクライアントSDKを取得できます。

Use a TCP client SDK to send normal messages

After you obtain the client SDK for a specific programming language, you can run the sample code of the programming language to send normal messages.


import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class ProducerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        // The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the "Prerequisites" section. 
        properties.put(PropertyKeyConst.AccessKey, "XXX");
        // The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the "Prerequisites" section. 
        properties.put(PropertyKeyConst.SecretKey, "XXX");
        // The timeout period for sending a message. Unit: milliseconds. 
        properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
        // Specify the TCP endpoint. You can view the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ forRocketMQ console. 

        Producer producer = ONSFactory.createProducer(properties);
        // Before you send a message, call the start() method only once to start the producer. 

        Message msg = new Message(
                // The topic to which the message belongs. 
                // The message tag. A message tag is similar to a Gmail tag and is used to help consumers filter messages on the ApsaraMQ forRocketMQ broker. 
                // The message body. A message body is in the binary format. ApsaraMQ forRocketMQ does not process message bodies. The producer and consumer must agree on the serialization and deserialization methods. 
                "Hello MQ".getBytes());

        // The message key. A key is the business-specific attribute of a message and must be globally unique whenever possible. // A unique key helps you query and resend a message in the ApsaraMQ forRocketMQ console if the message fails to be received. 
        // Note: You can send and receive messages even if you do not specify a message key. 

        // Send the message in asynchronous mode. The result is returned to the client by invoking the callback method. 
        producer.sendAsync(msg, new SendCallback() {
            public void onSuccess(final SendResult sendResult) {
                // The message is sent. 
                System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());

            public void onException(OnExceptionContext context) {
                // Specify the logic to resend or persist the message if the message fails to be sent and needs to be sent again. 
                System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());

        // Obtain the value of the msgId parameter before you invoke the callback method to return the result. 
        System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());

        // Before you exit the application, terminate the producer.  Note: This step is optional. 


using System;
using ons;

public class ProducerExampleForEx
    public ProducerExampleForEx()

    static void Main(string[] args) {
        // Configure your account based on the information in the Alibaba Cloud Management Console. 
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the "Prerequisites" section. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
        // The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the "Prerequisites" section. 

        factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
        // The ID of the consumer group that you created in the ApsaraMQ forRocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
        // The topic that you created in the ApsaraMQ forRocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // Specify the TCP endpoint. You can view the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ forRocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // The log path. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

        // Create the producer instance. 
        // Note: A producer instance is thread-safe and can be used to send messages to different topics. In most cases, each thread requires only one producer instance. 
        Producer producer = ONSFactory.getInstance().createProducer(factoryInfo);

        // Start the producer instance. 

        // Create the message. 
        Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
        for (int i = 0; i < 32; i++) {
                SendResultONS sendResult = producer.send(msg);
                Console.WriteLine("send success {0}", sendResult.getMessageId());
            catch (Exception ex)
                Console.WriteLine("send failure{0}", ex.ToString());

        // Before you exit your thread, terminate the producer instance. 



#include "ONSFactory.h"
#include "ONSClientException.h"

using namespace ons;

int main()

    // Create the producer and configure the parameters that are required to send messages. 
    ONSFactoryProperty factoryInfo;
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");// The ID of the consumer group that you created in the ApsaraMQ forRocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); // Specify the TCP endpoint. You can view the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ forRocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );// The topic that you created in the ApsaraMQ forRocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");// The message content. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the "Prerequisites" section. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX" );// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the "Prerequisites" section. 

    //create producer;
    Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);

    // Before you send a message, call the start() method only once to start the producer. 

    Message msg(
            //Message Topic
            // The message tag. A message tag is similar to a Gmail tag and is used to help consumers filter messages in the ApsaraMQ forRocketMQ broker. 
            // The message body. You cannot leave this parameter empty. ApsaraMQ forRocketMQ does not process message bodies. The producer and consumer must agree on the serialization and deserialization methods. 

    // The message key. A key is the business-specific attribute of a message and must be globally unique whenever possible. 
    // A key can be used to query and resend a message in the ApsaraMQ forRocketMQ console if the message fails to be received. 
    // Note: You can send and receive messages even if you do not specify a message key. 

    // Send the message. If no exception occurs, the message is sent. 
        SendResultONS sendResult = pProducer->send(msg);
    catch(ONSClientException & e)
        // Specify the logic to handle exceptions. 
    // Before you exit your application, you must terminate the producer. If you do not terminate the producer, issues such as memory leaks may occur. 

    return 0;

次の手順を実行してインスタンスを起動することもできます。 ApsaraMQ for RocketMQ consoleにログインします。作成したインスタンスを見つけて、操作 列の詳細をクリックします。ドロップダウンリストからすぐに体験を選択します。


メッセージの送信後、次の操作を実行して ApsaraMQ for RocketMQ console でそのステータスを確認できます。

  1. [インスタンスの詳細] ページで、左側のナビゲーションウィンドウからメッセージ検索をクリックします。

  2. メッセージ検索ページで、クエリ方法を選択し、必要に応じてパラメーターを指定してから、 検索をクリックします。

    indicates the time when the ApsaraMQ for RocketMQ ブローカーがメッセージを保存する場合、保存時間が時刻を示します。メッセージを照会できる場合、メッセージはApache RocketMQブローカーのメッセージキューに送信されています。


This step demonstrates the scenario where ApsaraMQ for RocketMQ is used for the first time and the consumer has not been started. Therefore, no consumption data is displayed in the console. To start the consumer and subscribe to messages, see the next section. For more information about the message status, see Query messages and Query message traces.

Use a TCP client SDK to subscribe to normal messages

通常のメッセージが送信されたら、コンシューマを起動してメッセージをサブスクライブする必要があります。 ビジネス要件に基づいた特定のプログラミング言語に対して次のサンプルコードを使用して、コンシューマを起動し、メッセージサブスクリプション機能をテストできます。 指示に従ってパラメーターを設定する必要があります。


import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class ConsumerTest {
   public static void main(String[] args) {
       Properties properties = new Properties();
        // The ID of the consumer group that you created in the ApsaraMQ forRocketMQ console. 
       properties.put(PropertyKeyConst.GROUP_ID, "XXX");
        // The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the "Prerequisites" section. 
       properties.put(PropertyKeyConst.AccessKey, "XXX");
        // The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the "Prerequisites" section. 
       properties.put(PropertyKeyConst.SecretKey, "XXX");
        // Specify the TCP endpoint. You can view the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ forRocketMQ console. 
          // The clustering consumption mode. This is the default mode. 
          // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
          // The broadcasting consumption mode. 
          // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

       Consumer consumer = ONSFactory.createConsumer(properties);
       consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { // Subscribe to multiple tags.
           public Action consume(Message message, ConsumeContext context) {
               System.out.println("Receive: " + message);
               return Action.CommitMessage;

        // Subscribe to another topic. To unsubscribe from this topic, delete the subscription code and restart the consumer. 
        consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // Subscribes to all tags. 
           public Action consume(Message message, ConsumeContext context) {
               System.out.println("Receive: " + message);
               return Action.CommitMessage;

       System.out.println("Consumer Started");


using System;
using System.Threading;
using System.Text;
using ons;

// The callback function that is executed when a message is pulled from the ApsaraMQ forRocketMQ broker. 
public class MyMsgListener : MessageListener
    public MyMsgListener()


    public override ons.Action consume(Message value, ConsumeContext context)
        Byte[] text = Encoding.Default.GetBytes(value.getBody());
        return ons.Action.CommitMessage;

public class ConsumerExampleForEx
    public ConsumerExampleForEx()

    static void Main(string[] args) {
        // Configure your account. You can obtain the account information in the Alibaba Cloud Management Console. 
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the "Prerequisites" section. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
        // The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the "Prerequisites" section. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
        // The ID of the consumer group that you created in the ApsaraMQ forRocketMQ console.
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
        // The topic that you created in the ApsaraMQ forRocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // Specify the TCP endpoint. You can view the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ forRocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // The log path. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
        // Clustering consumption. 
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
        // Broadcasting consumption. 
        // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);

        // Create the consumer instance. 
        PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);

        // Subscribe to topics. 
        consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());

        // Start the producer instance. 

        // This setting is only for the demo. In actual production environments, you cannot exit the process. 

        // Before you exit the process, terminate the consumer instance. 


#include "ONSFactory.h"
using namespace ons;

// Create a MyMsgListener instance to consume messages. 
// After the push consumer pulls the message, the consumer function of the instance is called. 
class MyMsgListener : public MessageListener



        virtual ~MyMsgListener()

        virtual Action consume(Message &message, ConsumeContext &context)
            // Specify the logic to process messages. 
            return CommitMessage; //CONSUME_SUCCESS;

int main(int argc, char* argv[])

    // The parameters that are required to create and run the push consumer. 
    ONSFactoryProperty factoryInfo;
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");// The ID of the consumer group that you created in the ApsaraMQ forRocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); // Specify the TCP endpoint. You can view the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ forRocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );// The topic that you created in the ApsaraMQ forRocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the "Prerequisites" section. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey,  "XXX");// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section. 
      // The clustering consumption mode. This is the default mode. 
      // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
      // The broadcasting consumption mode. 
      // factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);

    //create pushConsumer
    PushConsumer* pushConsumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);

    // Specify the topic and tags to which the push consumer subscribes. Register a message callback function. 
    MyMsgListener  msglistener;
    pushConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );

    //start pushConsumer

    // Note: The shutdown() method can be called only when no messages are received. After the shutdown() method is called, the consumer exits and no longer receives messages. 

    // Terminate the push consumer. Before you exit the application, you must terminate the consumer. Otherwise, issues such as memory leaks may occur. 
    return 0;


Check whether the message subscription is successful

  1. インスタンスの [インスタンスの詳細] ページで、左側のナビゲーションウィンドウからグループ管理 をクリックします。

  2. グループ管理ページで、 TCP プロトコル タブをクリックします。

  3. サブスクリプションステータスを表示するグループIDを見つけ、操作列の 詳細をクリックします。

    消費者ステータス の値がオンライン で、 [Is Subscription Conistent] パラメーターの値が [Yes] の場合、メッセージサブスクリプションは成功します。