After you create the required resources in the ApsaraMQ forRocketMQ console, you can use ApsaraMQ for RocketMQ TCP client SDKs to send and subscribe to normal messages.
Prerequisites
- Note
Normal messages are used in the provided examples. Topics that you create for normal messages cannot be used to send or subscribe to other types of messages, such as scheduled messages, delayed messages, ordered messages, and transactional messages. You must create a topic based on the message type of your messages.
Download and install a TCP client SDK
Commercial SDKs provide more features and higher stability than open source SDKs. We recommend that you use commercial SDKs provided by ApsaraMQ for RocketMQ to access ApsaraMQ forRocketMQ. Open source SDKs can be used only if you want to migrate your data from an Apache RocketMQ cluster to an ApsaraMQ for RocketMQ instance without modifying the code.
ApsaraMQ for RocketMQ provides the following commercial TCP client SDKs. You can obtain a client SDK for a specific programming language based on your business requirements.
C/C++ SDK
Use a TCP client SDK to send normal messages
After you obtain the client SDK for a specific programming language, you can run the sample code of the programming language to send normal messages.
Java
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.OnExceptionContext;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.SendCallback;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the "Prerequisites" section.
properties.put(PropertyKeyConst.AccessKey, "XXX");
// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the "Prerequisites" section.
properties.put(PropertyKeyConst.SecretKey, "XXX");
// The timeout period for sending a message. Unit: milliseconds.
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// Specify the TCP endpoint. You can view the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ forRocketMQ console.
properties.put(PropertyKeyConst.NAMESRV_ADDR,
"XXX");
Producer producer = ONSFactory.createProducer(properties);
// Before you send a message, call the start() method only once to start the producer.
producer.start();
Message msg = new Message(
// The topic to which the message belongs.
"TopicTestMQ",
// The message tag. A message tag is similar to a Gmail tag and is used to help consumers filter messages on the ApsaraMQ forRocketMQ broker.
"TagA",
// The message body. A message body is in the binary format. ApsaraMQ forRocketMQ does not process message bodies. The producer and consumer must agree on the serialization and deserialization methods.
"Hello MQ".getBytes());
// The message key. A key is the business-specific attribute of a message and must be globally unique whenever possible. // A unique key helps you query and resend a message in the ApsaraMQ forRocketMQ console if the message fails to be received.
// Note: You can send and receive messages even if you do not specify a message key.
msg.setKey("ORDERID_100");
// Send the message in asynchronous mode. The result is returned to the client by invoking the callback method.
producer.sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
// The message is sent.
System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
}
@Override
public void onException(OnExceptionContext context) {
// Specify the logic to resend or persist the message if the message fails to be sent and needs to be sent again.
System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
}
});
// Obtain the value of the msgId parameter before you invoke the callback method to return the result.
System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());
// Before you exit the application, terminate the producer. Note: This step is optional.
producer.shutdown();
}
}
.NET
using System;
using ons;
public class ProducerExampleForEx
{
public ProducerExampleForEx()
{
}
static void Main(string[] args) {
// Configure your account based on the information in the Alibaba Cloud Management Console.
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the "Prerequisites" section.
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the "Prerequisites" section.
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
// The ID of the consumer group that you created in the ApsaraMQ forRocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
// The topic that you created in the ApsaraMQ forRocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// Specify the TCP endpoint. You can view the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ forRocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
// The log path.
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// Create the producer instance.
// Note: A producer instance is thread-safe and can be used to send messages to different topics. In most cases, each thread requires only one producer instance.
Producer producer = ONSFactory.getInstance().createProducer(factoryInfo);
// Start the producer instance.
producer.start();
// Create the message.
Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
msg.setKey(Guid.NewGuid().ToString());
for (int i = 0; i < 32; i++) {
try
{
SendResultONS sendResult = producer.send(msg);
Console.WriteLine("send success {0}", sendResult.getMessageId());
}
catch (Exception ex)
{
Console.WriteLine("send failure{0}", ex.ToString());
}
}
// Before you exit your thread, terminate the producer instance.
producer.shutdown();
}
}
C/C++
#include "ONSFactory.h"
#include "ONSClientException.h"
using namespace ons;
int main()
{
// Create the producer and configure the parameters that are required to send messages.
ONSFactoryProperty factoryInfo;
factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");// The ID of the consumer group that you created in the ApsaraMQ forRocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); // Specify the TCP endpoint. You can view the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ forRocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );// The topic that you created in the ApsaraMQ forRocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");// The message content.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the "Prerequisites" section.
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX" );// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the "Prerequisites" section.
//create producer;
Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);
// Before you send a message, call the start() method only once to start the producer.
pProducer->start();
Message msg(
//Message Topic
factoryInfo.getPublishTopics(),
// The message tag. A message tag is similar to a Gmail tag and is used to help consumers filter messages in the ApsaraMQ forRocketMQ broker.
"TagA",
// The message body. You cannot leave this parameter empty. ApsaraMQ forRocketMQ does not process message bodies. The producer and consumer must agree on the serialization and deserialization methods.
factoryInfo.getMessageContent()
);
// The message key. A key is the business-specific attribute of a message and must be globally unique whenever possible.
// A key can be used to query and resend a message in the ApsaraMQ forRocketMQ console if the message fails to be received.
// Note: You can send and receive messages even if you do not specify a message key.
msg.setKey("ORDERID_100");
// Send the message. If no exception occurs, the message is sent.
try
{
SendResultONS sendResult = pProducer->send(msg);
}
catch(ONSClientException & e)
{
// Specify the logic to handle exceptions.
}
// Before you exit your application, you must terminate the producer. If you do not terminate the producer, issues such as memory leaks may occur.
pProducer->shutdown();
return 0;
}
You can also start your instance by performing the following steps: Log on to the ApsaraMQ for RocketMQ console. Find the created instance and click More in the Actions column. Select Quick Start from the drop-down list.
Check whether messages are sent
After a message is sent, you can check its status in the ApsaraMQ for RocketMQ console by performing the following operations:
- On the Instance Details page, click Message Query in the left-side navigation pane.
- On the Message Query page, select a query method and specify the parameters as required, and then click Search.
Stored At indicates the time when the ApsaraMQ for RocketMQ broker stores the message. If a message can be queried, the message has been sent to the Message Queue for Apache RocketMQ broker.
Use a TCP client SDK to subscribe to normal messages
After a normal message is sent, you must start a consumer to subscribe to the message. You can use the following sample code for a specific programming language based on your business requirements to start the consumer and test the message subscription feature. You must follow the instructions to configure the parameters.
Java
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// The ID of the consumer group that you created in the ApsaraMQ forRocketMQ console.
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the "Prerequisites" section.
properties.put(PropertyKeyConst.AccessKey, "XXX");
// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the "Prerequisites" section.
properties.put(PropertyKeyConst.SecretKey, "XXX");
// Specify the TCP endpoint. You can view the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ forRocketMQ console.
properties.put(PropertyKeyConst.NAMESRV_ADDR,
"XXX");
// The clustering consumption mode. This is the default mode.
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
// The broadcasting consumption mode.
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { // Subscribe to multiple tags.
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
// Subscribe to another topic. To unsubscribe from this topic, delete the subscription code and restart the consumer.
consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // Subscribes to all tags.
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}
.NET
using System;
using System.Threading;
using System.Text;
using ons;
// The callback function that is executed when a message is pulled from the ApsaraMQ forRocketMQ broker.
public class MyMsgListener : MessageListener
{
public MyMsgListener()
{
}
~MyMsgListener()
{
}
public override ons.Action consume(Message value, ConsumeContext context)
{
Byte[] text = Encoding.Default.GetBytes(value.getBody());
Console.WriteLine(Encoding.UTF8.GetString(text));
return ons.Action.CommitMessage;
}
}
public class ConsumerExampleForEx
{
public ConsumerExampleForEx()
{
}
static void Main(string[] args) {
// Configure your account. You can obtain the account information in the Alibaba Cloud Management Console.
ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the "Prerequisites" section.
factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the "Prerequisites" section.
factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
// The ID of the consumer group that you created in the ApsaraMQ forRocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.ConsumerId, "GID_example");
// The topic that you created in the ApsaraMQ forRocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
// Specify the TCP endpoint. You can view the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ forRocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
// The log path.
factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");
// Clustering consumption.
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.CLUSTERING);
// Broadcasting consumption.
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty.BROADCASTING);
// Create the consumer instance.
PushConsumer consumer = ONSFactory.getInstance().createPushConsumer(factoryInfo);
// Subscribe to topics.
consumer.subscribe(factoryInfo.getPublishTopics(), "*", new MyMsgListener());
// Start the producer instance.
consumer.start();
// This setting is only for the demo. In actual production environments, you cannot exit the process.
Thread.Sleep(300000);
// Before you exit the process, terminate the consumer instance.
consumer.shutdown();
}
}
C/C++
#include "ONSFactory.h"
using namespace ons;
// Create a MyMsgListener instance to consume messages.
// After the push consumer pulls the message, the consumer function of the instance is called.
class MyMsgListener : public MessageListener
{
public:
MyMsgListener()
{
}
virtual ~MyMsgListener()
{
}
virtual Action consume(Message &message, ConsumeContext &context)
{
// Specify the logic to process messages.
return CommitMessage; //CONSUME_SUCCESS;
}
};
int main(int argc, char* argv[])
{
// The parameters that are required to create and run the push consumer.
ONSFactoryProperty factoryInfo;
factoryInfo.setFactoryProperty(ONSFactoryProperty::ConsumerId, "XXX");// The ID of the consumer group that you created in the ApsaraMQ forRocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); // Specify the TCP endpoint. You can view the endpoint in the TCP Endpoint section of the Instance Details page in the ApsaraMQ forRocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );// The topic that you created in the ApsaraMQ forRocketMQ console.
factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the "Prerequisites" section.
factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX");// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section.
// The clustering consumption mode. This is the default mode.
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::CLUSTERING);
// The broadcasting consumption mode.
// factoryInfo.setFactoryProperty(ONSFactoryProperty:: MessageModel, ONSFactoryProperty::BROADCASTING);
//create pushConsumer
PushConsumer* pushConsumer = ONSFactory::getInstance()->createPushConsumer(factoryInfo);
// Specify the topic and tags to which the push consumer subscribes. Register a message callback function.
MyMsgListener msglistener;
pushConsumer->subscribe(factoryInfo.getPublishTopics(), "*",&msglistener );
//start pushConsumer
pushConsumer->start();
// Note: The shutdown() method can be called only when no messages are received. After the shutdown() method is called, the consumer exits and no longer receives messages.
// Terminate the push consumer. Before you exit the application, you must terminate the consumer. Otherwise, issues such as memory leaks may occur.
pushConsumer->shutdown();
return 0;
}
Check whether the message subscription is successful
- On the Instance Details page of the instance, click Groups in the left-side navigation pane.
- On the Groups page, click the TCP tab.
- Find the Group ID whose subscription status that you want to view, and click Details in the Actions column.If the value of Consumer Status is Online and the value of the Is Subscription Consistent parameter is Yes, the message subscription is successful.