Prerequisites
Before you start, make sure that the following operations are performed:
Install the SDK for Node.js. For more information, see Prepare the environment.
Create the resources that you want to specify in the code in the ApsaraMQ for RocketMQ console. The resources include instances, topics, and consumer groups. For more information, see Create resources.
Obtain the AccessKey pair of your Alibaba Cloud account. For more information, see Create an AccessKey pair.
Send normal messages
The following sample code provides an example on how to send normal messages by using the HTTP client SDK for Node.js:
const {
MQClient,
MessageProperties
} = require('@aliyunmq/mq-http-sdk');
const endpoint = "${HTTP_ENDPOINT}";
const accessKeyId = process.env['ALIBABA_CLOUD_ACCESS_KEY_ID'];
const accessKeySecret = process.env['ALIBABA_CLOUD_ACCESS_KEY_SECRET'];
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
const topic = "${TOPIC}";
const instanceId = "${INSTANCE_ID}";
const producer = client.getProducer(instanceId, topic);
(async function(){
try {
for(var i = 0; i < 4; i++) {
let res;
msgProps = new MessageProperties();
msgProps.putProperty("a", i);
msgProps.messageKey("MessageKey");
res = await producer.publishMessage("hello mq.", "TagA", msgProps);
console.log("Publish message: MessageID:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5);
}
} catch(e) {
console.log(e)
}
})();
Subscribe to normal messages
The following sample code provides an example on how to subscribe to normal messages by using the HTTP client SDK for Node.js:
const {
MQClient
} = require('@aliyunmq/mq-http-sdk');
const endpoint = "${HTTP_ENDPOINT}";
const accessKeyId = process.env['ALIBABA_CLOUD_ACCESS_KEY_ID'];
const accessKeySecret = process.env['ALIBABA_CLOUD_ACCESS_KEY_SECRET'];
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
const topic = "${TOPIC}";
const groupId = "${GROUP_ID}";
const instanceId = "${INSTANCE_ID}";
const consumer = client.getConsumer(instanceId, topic, groupId);
(async function(){
while(true) {
try {
res = await consumer.consumeMessage(
3,
3
);
if (res.code == 200) {
console.log("Consume Messages, requestId:%s", res.requestId);
const handles = res.body.map((message) => {
console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" +
",Props:%j,MessageKey:%s,Prop-A:%s",
message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
return message.ReceiptHandle;
});
res = await consumer.ackMessage(handles);
if (res.code != 204) {
console.log("Ack Message Fail:");
const failHandles = res.body.map((error)=>{
console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
return error.ReceiptHandle;
});
handles.forEach((handle)=>{
if (failHandles.indexOf(handle) < 0) {
console.log("\tSucHandle:%s\n", handle);
}
});
} else {
console.log("Ack Message suc, RequestId:%s\n\t", res.requestId, handles.join(','));
}
}
} catch(e) {
if (e.Code.indexOf("MessageNotExist") > -1) {
console.log("Consume Message: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);
} else {
console.log(e);
}
}
}
})();