All Products
Search
Document Center

ApsaraMQ for RocketMQ:Use HTTP client SDKs to send and subscribe to normal messages

Last Updated:Oct 21, 2024

After you create the required resources in the ApsaraMQ forRocketMQ console, you can use a ApsaraMQ for RocketMQ HTTP client SDK to send and subscribe to normal messages.

Prerequisites

  • Create resources

    Note

    Normal messages are used in the provided examples. Topics that you create for normal messages cannot be used to send or subscribe to other types of messages, such as scheduled messages, delayed messages, ordered messages, and transactional messages. You must create a topic based on the message type of your messages.

  • Create an AccessKey pair

Download and install an HTTP client SDK

ApsaraMQ for RocketMQ provides the following HTTP client SDKs for multiple programming languages. Download and install a client SDK for a specific language based on your business requirements.

Use an HTTP client SDK to send normal messages

After you obtain the client SDK for a specific programming language, you can run the sample code for the programming language to send normal messages:

Java

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;

import java.util.Date;

public class Producer {

    public static void main(String[] args) {
        MQClient mqClient = new MQClient(
                // Specify the HTTP endpoint. 
                "${HTTP_ENDPOINT}",
                // The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section. 
                "${ACCESS_KEY}",
                // The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section. 
                "${SECRET_KEY}"
        );

        // The topic to which the message belongs. 
        final String topic = "${TOPIC}";
        // The ID of the instance to which the topic belongs. Default value: null. 
        final String instanceId = "${INSTANCE_ID}";

        // Obtain the producer that sends messages to the topic. 
        MQProducer producer;
        if (instanceId != null && instanceId != "") {
            producer = mqClient.getProducer(instanceId, topic);
        } else {
            producer = mqClient.getProducer(topic);
        }

        try {
            // Cyclically send four messages. 
            for (int i = 0; i < 4; i++) {
                TopicMessage pubMsg;
                if (i % 2 == 0) {
                    // The normal message. 
                    pubMsg = new TopicMessage(
                            // The message content. 
                            "hello mq!".getBytes(),
                            // The message tag. 
                            "A"
                    );
                    // The message attributes. 
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    // The message key. 
                    pubMsg.setMessageKey("MessageKey");
                } else {
                    pubMsg = new TopicMessage(
                            // The message content. 
                            "hello mq!".getBytes(),
                            // The message tag. 
                            "A"
                    );
                    // The message attributes. 
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    // Schedule to send the message 10 seconds later. 
                    pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
                }
                // Send the message in synchronous mode. If no exception is thrown, the message is sent. 
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                // Send the message in synchronous mode. If no exception is thrown, the message is sent. 
                System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
            // Specify the logic to resend or persist the message if the message fails to be sent and needs to be sent again. 
            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }

}                          

Go

package main

import (
    "fmt"
    "time"
    "strconv"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    // Specify the HTTP endpoint. 
    endpoint := "${HTTP_ENDPOINT}"
    // The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section. 
    accessKey := "${ACCESS_KEY}"
    // The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section. 
    secretKey := "${SECRET_KEY}"
    // The topic to which the message belongs. 
    topic := "${TOPIC}"
    // The ID of the instance to which the topic belongs. Default value: null. 
    instanceId := "${INSTANCE_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

    mqProducer := client.GetProducer(instanceId, topic)
    // Cyclically send four messages. 
    for i := 0; i < 4; i++ {
        var msg mq_http_sdk.PublishMessageRequest
        if i%2 == 0 {
            msg = mq_http_sdk.PublishMessageRequest{
                MessageBody: "hello mq!",         // The message content. 
                MessageTag:  "",                  // The message tag. 
                Properties:  map[string]string{}, // The message attributes. 
            }
            // The message key. 
            msg.MessageKey = "MessageKey"
            // The message attributes. 
            msg.Properties["a"] = strconv.Itoa(i)
        } else {
            msg = mq_http_sdk.PublishMessageRequest{
                MessageBody: "hello mq timer!",         // The message content. 
                MessageTag:  "",                  // The message tag. 
                Properties:  map[string]string{}, // The message attributes. 
            }
            // The message attributes. 
            msg.Properties["a"] = strconv.Itoa(i)
            // Schedule to send the message 10 seconds later. The value is a UNIX timestamp in milliseconds. 
            msg.StartDeliverTime = time.Now().UTC().Unix() * 1000 + 10 * 1000
        }
        ret, err := mqProducer.PublishMessage(msg)

        if err != nil {
            fmt.Println(err)
            return
        } else {
            fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
        }
        time.Sleep(time.Duration(100) * time.Millisecond)
    }
}                          

PHP

<?php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ProducerTest
{
    private $client;
    private $producer;

    public function __construct()
    {
        $this->client = new MQClient(
            // Specify the HTTP endpoint. 
            "${HTTP_ENDPOINT}",
            // The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section. 
            "${ACCESS_KEY}",
            // The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section. 
            "${SECRET_KEY}"
        );

        // The topic to which the message belongs. 
        $topic = "${TOPIC}";
        // The ID of the instance to which the topic belongs. Default value: null. 
        $instanceId = "${INSTANCE_ID}";

        $this->producer = $this->client->getProducer($instanceId, $topic);
    }

    public function run()
    {
        try
        {
            for ($i=1; $i<=4; $i++)
            {
                $publishMessage = new TopicMessage(
                    "xxxxxxxx"// The message content. 
                );
                // The message attributes. 
                $publishMessage->putProperty("a", $i);
                // The message key. 
                $publishMessage->setMessageKey("MessageKey");
                if ($i % 2 == 0) {
                    // Schedule to send the message 10 seconds later. 
                    $publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000);
                }
                $result = $this->producer->publishMessage($publishMessage);

                print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";
            }
        } catch (\Exception $e) {
            print_r($e->getMessage() . "\n");
        }
    }
}


$instance = new ProducerTest();
$instance->run();

?>                         

Python

#!/usr/bin/env python
# coding=utf8
import sys

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
import time

# Initialize the client. 
mq_client = MQClient(
    # Specify the HTTP endpoint. 
    "${HTTP_ENDPOINT}",
    # The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section. 
    "${ACCESS_KEY}",
    # The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section. 
    "${SECRET_KEY}"
    )
# The topic to which the message belongs. 
topic_name = "${TOPIC}"
# The ID of the instance to which the topic belongs. Default value: None. 
instance_id = "${INSTANCE_ID}"

producer = mq_client.get_producer(instance_id, topic_name)

# Cyclically send multiple messages. 
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count))

try:
    for i in range(msg_count):
        if i % 2 == 0:
            msg = TopicMessage(
                    # The message content. 
                    "I am test message %s.Hello" % i, 
                    # The message tag. 
                    ""
                    )
            # The message attributes. 
            msg.put_property("a", "i")
            # The message key. 
            msg.set_message_key("MessageKey")
            re_msg = producer.publish_message(msg)
            print("Publish Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
        else:
            msg = TopicMessage(
                    # The message content. 
                    "I am test message %s." % i, 
                    # The message tag. 
                    ""
                    )
            msg.put_property("a", i)
            # Schedule an absolute time in milliseconds to send the message. 
            msg.set_start_deliver_time(int(round(time.time() * 1000)) + 5 * 1000)
            re_msg = producer.publish_message(msg)
            print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
        time.sleep(1)
except MQExceptionBase as e:
    if e.type == "TopicNotExist":
        print("Topic not exist, please create it.")
        sys.exit(1)
    print("Publish Message Fail. Exception:%s" % e)                       

Node.js

const {
  MQClient,
  MessageProperties
} = require('@aliyunmq/mq-http-sdk');

// Specify the HTTP endpoint. 
const endpoint = "${HTTP_ENDPOINT}";
// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section. 
const accessKeyId = "${ACCESS_KEY}";
// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section. 
const accessKeySecret = "${SECRET_KEY}";

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

// The topic to which the message belongs. 
const topic = "${TOPIC}";
// The ID of the instance to which the topic belongs. Default value: null. 
const instanceId = "${INSTANCE_ID}";

const producer = client.getProducer(instanceId, topic);

(async function(){
  try {
    // Cyclically send four messages. 
    for(var i = 0; i < 4; i++) {
      let res;
      if (i % 2 == 0) {
        msgProps = new MessageProperties();
        // The message attributes. 
        msgProps.putProperty("a", i);
        // The message key. 
        msgProps.messageKey("MessageKey");
        res = await producer.publishMessage("hello mq.", "", msgProps);
      } else {
        msgProps = new MessageProperties();
        // The message attributes. 
        msgProps.putProperty("a", i);
        // Schedule to send the message 10 seconds later. 
        msgProps.startDeliverTime(Date.now() + 10 * 1000);
        res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps);
      }
      console.log("Publish message: MessageID:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5);
    }

  } catch(e) {
    // Specify the logic to resend or persist the message if the message fails to be sent and needs to be sent again. 
    console.log(e)
  }
})();                         

C++

//#include <iostream>
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"

using namespace std;
using namespace mq::http::sdk;


int main() {

    MQClient mqClient(
            // Specify the HTTP endpoint. 
            "${HTTP_ENDPOINT}",
            // The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section. 
            "${ACCESS_KEY}",
            // The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section. 
            "${SECRET_KEY}"
            );

    // The topic to which the message belongs. 
    string topic = "${TOPIC}";
    // The ID of the instance to which the topic belongs. Default value: null. 
    string instanceId = "${INSTANCE_ID}";

    MQProducerPtr producer;
    if (instanceId == "") {
        producer = mqClient.getProducerRef(topic);
    } else {
        producer = mqClient.getProducerRef(instanceId, topic);
    }

    try {
        for (int i = 0; i < 4; i++)
        {
            PublishMessageResponse pmResp;
            if (i % 4 == 0) {
                // publish message, only have body. 
                producer->publishMessage("Hello, mq!", pmResp);
            } else if (i % 4 == 1) {
                // publish message, only have body and tag.
                producer->publishMessage("Hello, mq!have tag!", "tag", pmResp);
            } else if (i % 4 == 2) {
                // publish message, have body,tag,properties and key.
                TopicMessage pubMsg("Hello, mq!have key!");
                pubMsg.putProperty("a",std::to_string(i));
                pubMsg.setMessageKey("MessageKey" + std::to_string(i));
                producer->publishMessage(pubMsg, pmResp);
            } else {
                // publish timer message, message will be consumed after StartDeliverTime
                TopicMessage pubMsg("Hello, mq!timer msg!", "tag");
                // StartDeliverTime is an absolute time in millisecond.
                pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000);
                pubMsg.putProperty("b",std::to_string(i));
                pubMsg.putProperty("c",std::to_string(i));
                producer->publishMessage(pubMsg, pmResp);
            }
            cout << "Publish mq message success. Topic is: " << topic 
                << ", msgId is:" << pmResp.getMessageId() 
                << ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
        }
    } catch (MQServerException& me) {
        cout << "Request Failed: " + me.GetErrorCode() << ", requestId is:" << me.GetRequestId() << endl;
        return -1;
    } catch (MQExceptionBase& mb) {
        cout << "Request Failed: " + mb.ToString() << endl;
        return -2;
    }

    return 0;
}                        

C#

using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;

namespace Aliyun.MQ.Sample
{
    public class ProducerSample
    {
        // Specify the HTTP endpoint. 
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section. 
        private const string _accessKeyId = "${ACCESS_KEY}";
        // The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section. 
        private const string _secretAccessKey = "${SECRET_KEY}";
        // The topic to which the message belongs. 
        private const string _topicName = "${TOPIC}";
        // The ID of the instance to which the topic belongs. Default value: null. 
        private const string _instanceId = "${INSTANCE_ID}";

        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);

        static MQProducer producer = _client.GetProducer(_instanceId, _topicName);

        static void Main(string[] args)
        {
            try
            {
                // Cyclically send four messages. 
                for (int i = 0; i < 4; i++)
                {
                    TopicMessage sendMsg;
                    if (i % 2 == 0)
                    {
                        sendMsg = new TopicMessage("dfadfadfadf");
                        // The message attributes. 
                        sendMsg.PutProperty("a", i.ToString());
                        // The message key. 
                        sendMsg.MessageKey = "MessageKey";
                    }
                    else
                    {
                        sendMsg = new TopicMessage("dfadfadfadf", "tag");
                        // The message attributes. 
                        sendMsg.PutProperty("a", i.ToString());
                        // Schedule to send the message 10 seconds later. 
                        sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp() + 10 * 1000;
                    }
                    TopicMessage result = producer.PublishMessage(sendMsg);
                    Console.WriteLine("publis message success:" + result);
                }
            }
            catch (Exception ex)
            {
                Console.Write(ex);
            }
        }
    }
}                       

You can also start your instance by performing the following steps: Log on to the ApsaraMQ for RocketMQ console. Find the created instance and click More in the Actions column. Select Quick Start from the drop-down list.

Use an HTTP client SDK to consume normal messages

After normal messages are sent, you must start consumers to consume the messages. You can use the following sample code for a specific programming language based on your business requirements to start consumers and test the message consumption feature. Specify the parameters based on comments in the following code:

Java

 import com.aliyun.mq.http.MQClient;
 import com.aliyun.mq.http.MQConsumer;
 import com.aliyun.mq.http.common.AckMessageException;
 import com.aliyun.mq.http.model.Message;
 
 import java.util.ArrayList;
 import java.util.List;
 
 public class Consumer {
 
     public static void main(String[] args) {
         MQClient mqClient = new MQClient(
                 // Specify the HTTP endpoint. 
                 "${HTTP_ENDPOINT}",
                 // The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section. 
                 "${ACCESS_KEY}",
                 // The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section. 
                 "${SECRET_KEY}"
         );
 
         // The topic to which the message belongs. 
         final String topic = "${TOPIC}";
         // The ID of the group that you created in the ApsaraMQ forRocketMQ console. A group ID is also known as a consumer ID. 
         final String groupId = "${GROUP_ID}";
         // The ID of the instance to which the topic belongs. Default value: null. 
         final String instanceId = "${INSTANCE_ID}";
 
         final MQConsumer consumer;
         if (instanceId != null && instanceId != "") {
             consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
         } else {
             consumer = mqClient.getConsumer(topic, groupId);
         }
 
         // Cyclically consume messages in the current thread. We recommend that you use multiple threads to concurrently consume messages. 
         do {
             List<Message> messages = null;
 
             try {
                 // Consume messages in long polling mode. 
                 // In long polling mode, if no message is available for consumption in the topic, the request is hung on the server for 3 seconds. If messages are available for consumption within the duration, a response is immediately sent to the client. 
                 messages = consumer.consumeMessage(
                         3,//  The maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16. 
                         3// The duration of a long polling cycle. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30. 
                 );
             } catch (Throwable e) {
                 e.printStackTrace();
                 try {
                     Thread.sleep(2000);
                 } catch (InterruptedException e1) {
                     e1.printStackTrace();
                 }
             }
             // No message. 
             if (messages == null || messages.isEmpty()) {
                 System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
                 continue;
             }
 
             // The message consumption logic. 
             for (Message message : messages) {
                 System.out.println("Receive message: " + message);
             }
 
             // If a broker fails to receive an acknowledgment (ACK) for a message from a consumer before the period of time specified by the Message.nextConsumeTime parameter elapses, the message is consumed again. 
             // A unique timestamp is specified for the handle of a message each time the message is consumed. 
             {
                 List<String> handles = new ArrayList<String>();
                 for (Message message : messages) {
                     handles.add(message.getReceiptHandle());
                 }
 
                 try {
                     consumer.ackMessage(handles);
                 } catch (Throwable e) {
                     // If the handles of some messages time out, the broker fails to receive ACKs for the messages from consumers. 
                     if (e instanceof AckMessageException) {
                         AckMessageException errors = (AckMessageException) e;
                         System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
                         if (errors.getErrorMessages() != null) {
                             for (String errorHandle :errors.getErrorMessages().keySet()) {
                                 System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
                                         + ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
                             }
                         }
                         continue;
                     }
                     e.printStackTrace();
                 }
             }
         } while (true);
     }
 }        

Go

package main

import (
    "fmt"
    "github.com/gogap/errors"
    "strings"
    "time"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    // Specify the HTTP endpoint. 
    endpoint := "${HTTP_ENDPOINT}"
    // The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section. 
    accessKey := "${ACCESS_KEY}"
    // The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section. 
    secretKey := "${SECRET_KEY}"
    // The topic to which the message belongs. 
    topic := "${TOPIC}"
    // The ID of the instance to which the topic belongs. Default value: null. 
    instanceId := "${INSTANCE_ID}"
    // The ID of the group that you created in the ApsaraMQ forRocketMQ console. A group ID is also known as a consumer ID. 
    groupId := "${GROUP_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

    mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")

    for {
        endChan := make(chan int)
        respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
        errChan := make(chan error)
        go func() {
            select {
            case resp := <-respChan:
                {
                    // The message consumption logic. 
                    var handles []string
                    fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
                    for _, v := range resp.Messages {
                        handles = append(handles, v.ReceiptHandle)
                        fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
                            "\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
                            "\tBody: %s\n"+
                            "\tProps: %s\n",
                            v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
                            v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
                    }

                    // If a broker fails to receive an ACK for a message from a consumer before the period of time specified by the NextConsumeTime parameter elapses, the message is consumed again. 
                    // A unique timestamp is specified for the handle of a message each time the message is consumed. 
                    ackerr := mqConsumer.AckMessage(handles)
                    if ackerr != nil {
                        // If the handles of some messages time out, the broker fails to receive ACKs for the messages from consumers. 
                        fmt.Println(ackerr)
                        for _, errAckItem := range ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
                            fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
                                errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
                        }
                        time.Sleep(time.Duration(3) * time.Second)
                    } else {
                        fmt.Printf("Ack ---->\n\t%s\n", handles)
                    }

                    endChan <- 1
                }
            case err := <-errChan:
                {
                    // No message. 
                    if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
                        fmt.Println("\nNo new message, continue!")
                    } else {
                        fmt.Println(err)
                        time.Sleep(time.Duration(3) * time.Second)
                    }
                    endChan <- 1
                }
            case <-time.After(35 * time.Second):
                {
                    fmt.Println("Timeout of consumer message ??")
                    endChan <- 1
                }
            }
        }()

        // Consume messages in long polling mode. 
        // In long polling mode, if no message is available for consumption in the topic, the request is hung on the server for 3 seconds. If messages are available for consumption within the duration, a response is immediately sent to the client. 
        mqConsumer.ConsumeMessage(respChan, errChan,
            3, //  The maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16. 
            3, // The duration of a long polling cycle. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30. 
        )
        <-endChan
    }
}                                                                   

PHP

<?php

require "vendor/autoload.php";

use MQ\Model\TopicMessage;
use MQ\MQClient;

class ConsumerTest
{
    private $client;
    private $consumer;

    public function __construct()
    {
        $this->client = new MQClient(
            // Specify the HTTP endpoint. 
            "${HTTP_ENDPOINT}",
            // The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section. 
            "${ACCESS_KEY}",
            // The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section. 
            "${SECRET_KEY}"
        );

        // The topic to which the message belongs. 
        $topic = "${TOPIC}";
        // The ID of the group that you created in the ApsaraMQ forRocketMQ console. A group ID is also known as a consumer ID. 
        $groupId = "${GROUP_ID}";
        // The ID of the instance to which the topic belongs. Default value: null. 
        $instanceId = "${INSTANCE_ID}";

        $this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
    }

    public function run()
    {
        // Cyclically consume messages in the current thread. We recommend that you use multiple threads to concurrently consume messages. 
        while (True) {
            try {
                // Consume messages in long polling mode. 
                // In long polling mode, if no message is available for consumption in the topic, the request is hung on the server for 3 seconds. If messages are available for consumption within the duration, a response is immediately sent to the client. 
                $messages = $this->consumer->consumeMessage(
                    3, //  The maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16. 
                    3 // The duration of a long polling cycle. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30. 
                );
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\MessageNotExistException) {
                    // If no message is available for consumption in the topic, the long polling mode continues to take effect. 
                    printf("No message, contine long polling!RequestId:%s\n", $e->getRequestId());
                    continue;
                }

                print_r($e->getMessage() . "\n");

                sleep(3);
                continue;
            }

            print "consume finish, messages:\n";

            // The message consumption logic. 
            $receiptHandles = array();
            foreach ($messages as $message) {
                $receiptHandles[] = $message->getReceiptHandle();
                printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\n",
                    $message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
                    $message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
                    $message->getMessageKey());
                print_r($message->getProperties());
            }

            // If a broker fails to receive an ACK for a message from a consumer before the period of time specified by the $message->getNextConsumeTime() parameter elapses, the message is consumed again. 
            // A unique timestamp is specified for the handle of a message each time the message is consumed. 
            print_r($receiptHandles);
            try {
                $this->consumer->ackMessage($receiptHandles);
            } catch (\Exception $e) {
                if ($e instanceof MQ\Exception\AckMessageException) {
                    // If the handles of some messages time out, the broker fails to receive ACKs for the messages from consumers. 
                    printf("Ack Error, RequestId:%s\n", $e->getRequestId());
                    foreach ($e->getAckMessageErrorItems() as $errorItem) {
                        printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
                    }
                }
            }
            print "ack finish\n";


        }

    }
}


$instance = new ConsumerTest();
$instance->run();

?>                                               

Python

#!/usr/bin/env python
# coding=utf8

from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *

# Initialize the client.
mq_client = MQClient(
    # Specify the HTTP endpoint. 
    "",
    # The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section. 
    "${ACCESS_KEY}",
    # The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section. 
    "${SECRET_KEY}"
  )
# The topic to which the message belongs. 
topic_name = "${TOPIC}"
The ID of the group that you created in the ApsaraMQ forRocketMQ console. 
group_id = "GID_test"
# The ID of the instance to which the topic belongs. Default value: None. 
instance_id = "MQ_INST_1380156306793859_BbXbx0Y4"

consumer = mq_client.get_consumer(instance_id, topic_name, group_id)

# In long polling mode, if no message is available for consumption in the topic, the request is hung on the server for 3 seconds. If messages are available for consumption within the duration, a response is immediately sent to the client. 
# The duration of a long polling cycle. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30. 
wait_seconds = 3
# The maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16. 
batch = 3
print "%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)
while True:
    try:
        # Consume messages in long polling mode. 
        recv_msgs = consumer.consume_message(batch, wait_seconds)
        for msg in recv_msgs:
            print "Receive, MessageId: %s\nMessageBodyMD5: %s \
                              \nMessageTag: %s\nConsumedTimes: %s \
                              \nPublishTime: %s\nBody: %s \
                              \nNextConsumeTime: %s \
                              \nReceiptHandle: %s" % \
                             (msg.message_id, msg.message_body_md5,
                              msg.message_tag, msg.consumed_times,
                              msg.publish_time, msg.message_body,
                              msg.next_consume_time, msg.receipt_handle)
    except MQExceptionBase, e:
        if e.type == "MessageNotExist":
            print "No new message! RequestId: %s" % e.req_id
            continue

        print "Consume Message Fail! Exception:%s\n" % e
        time.sleep(2)
        continue

    # If a broker fails to receive an ACK for a message from a consumer before the period of time specified by the #msg.next_consume_time parameter elapses, the message is consumed again. 
    // A unique timestamp is specified for the handle of a message each time the message is consumed. 
    try:
        receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
        consumer.ack_message(receipt_handle_list)
        print "Ak %s Message Succeed.\n\n" % len(receipt_handle_list)
    except MQExceptionBase, e:
        print "\nAk Message Fail! Exception:%s" % e
        # If the handles of some messages time out, the broker fails to receive ACKs for the messages from consumers. 
        if e.sub_errors:
          for sub_error in e.sub_errors:
            print "\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])

Node.js

const {
  MQClient
} = require('@aliyunmq/mq-http-sdk');

// Specify the HTTP endpoint. 
const endpoint = "${HTTP_ENDPOINT}";
// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section. 
const accessKeyId = "${ACCESS_KEY}";
// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section. 
const accessKeySecret = "${SECRET_KEY}";

var client = new MQClient(endpoint, accessKeyId, accessKeySecret);

// The topic to which the message belongs. 
const topic = "${TOPIC}";
// The ID of the group that you created in the ApsaraMQ forRocketMQ console. A group ID is also known as a consumer ID. 
const groupId = "${GROUP_ID}";
// The ID of the instance to which the topic belongs. Default value: null. 
const instanceId = "${INSTANCE_ID}";

const consumer = client.getConsumer(instanceId, topic, groupId);

(async function(){
  // Cyclically consume messages. 
  while(true) {
    try {
      // Consume messages in long polling mode. 
      // In long polling mode, if no message is available for consumption in the topic, the request is hung on the server for 3 seconds. If messages are available for consumption within the duration, a response is immediately sent to the client. 
      res = await consumer.consumeMessage(
          3, //  The maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16. 
          3 // The duration of a long polling cycle. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30. 
          );

      if (res.code == 200) {
        // The message consumption logic. 
        console.log("Consume Messages, requestId:%s", res.requestId);
        const handles = res.body.map((message) => {
          console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" + 
            ",Props:%j,MessageKey:%s,Prop-A:%s",
              message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
              message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
          return message.ReceiptHandle;
        });

        // If a broker fails to receive an ACK for a message from a consumer before the period of time specified by the message.NextConsumeTime parameter elapses, the message is consumed again. 
        // A unique timestamp is specified for the handle of a message each time the message is consumed. 
        res = await consumer.ackMessage(handles);
        if (res.code != 204) {
          // If the handles of some messages time out, the broker fails to receive ACKs for the messages from consumers. 
          console.log("Ack Message Fail:");
          const failHandles = res.body.map((error)=>{
            console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
            return error.ReceiptHandle;
          });
          handles.forEach((handle)=>{
            if (failHandles.indexOf(handle) < 0) {
              console.log("\tSucHandle:%s\n", handle);
            }
          });
        } else {
          // Obtain an ACK from the consumer. 
          console.log("Ack Message suc, RequestId:%s\n\t", res.requestId, handles.join(','));
        }
      }
    } catch(e) {
      if (e.Code.indexOf("MessageNotExist") > -1) {
        // If no message is available for consumption in the topic, the long polling mode continues to take effect. 
        console.log("Consume Message: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);
      } else {
        console.log(e);
      }
    }
  }
})();                                   

C++

#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"

#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif

using namespace std;
using namespace mq::http::sdk;


int main() {

    MQClient mqClient(
            // Specify the HTTP endpoint. 
            "${HTTP_ENDPOINT}",
            // The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section. 
            "${ACCESS_KEY}",
            // The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section. 
            "${SECRET_KEY}"
            );

    // The topic to which the message belongs. 
    string topic = "${TOPIC}";
    // The ID of the group that you created in the ApsaraMQ forRocketMQ console. A group ID is also known as a consumer ID. 
    string groupId = "${GROUP_ID}";
    // The ID of the instance to which the topic belongs. Default value: null. 
    string instanceId = "${INSTANCE_ID}";

    MQConsumerPtr consumer;
    if (instanceId == "") {
        consumer = mqClient.getConsumerRef(topic, groupId);
    } else {
        consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
    }

    do {
        try {
            std::vector<Message> messages;
            // Consume messages in long polling mode. 
            // In long polling mode, if no message is available for consumption in the topic, the request is hung on the server for 3 seconds. If messages are available for consumption within the duration, a response is immediately sent to the client. 
            consumer->consumeMessage(
                    3,//  The maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16. 
                    3,// The duration of a long polling cycle. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30. 
                    messages
            );
            cout << "Consume: " << messages.size() << " Messages!" << endl;

            // The message consumption logic. 
            std::vector<std::string> receiptHandles;
            for (std::vector<Message>::iterator iter = messages.begin();
                    iter != messages.end(); ++iter)
            {
                cout << "MessageId: " << iter->getMessageId()
                    << " PublishTime: " << iter->getPublishTime()
                    << " Tag: " << iter->getMessageTag()
                    << " Body: " << iter->getMessageBody()
                    << " FirstConsumeTime: " << iter->getFirstConsumeTime()
                    << " NextConsumeTime: " << iter->getNextConsumeTime()
                    << " ConsumedTimes: " << iter->getConsumedTimes() 
                    << " Properties: " << iter->getPropertiesAsString() 
                    << " Key: " << iter->getMessageKey() << endl;
                receiptHandles.push_back(iter->getReceiptHandle());
            }

            // Obtain an ACK from the consumer. 
            // If a broker fails to receive an ACK for a message from a consumer before the period of time specified by the Message.NextConsumeTime parameter elapses, the message is consumed again. 
            // A unique timestamp is specified for the handle of a message each time the message is consumed. 
            AckMessageResponse bdmResp;
            consumer->ackMessage(receiptHandles, bdmResp);
            if (!bdmResp.isSuccess()) {
                // If the handles of some messages time out, the broker fails to receive ACKs for the messages from consumers. 
                const std::vector<AckMessageFailedItem>& failedItems =
                    bdmResp.getAckMessageFailedItem();
                for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
                        iter != failedItems.end(); ++iter)
                {
                    cout << "AckFailedItem: " << iter->errorCode
                        << "  " << iter->receiptHandle << endl;
                }
            } else {
                cout << "Ack: " << messages.size() << " messages suc!" << endl;
            }
        } catch (MQServerException& me) {
            if (me.GetErrorCode() == "MessageNotExist") {
                cout << "No message to consume! RequestId: " + me.GetRequestId() << endl;
                continue;
            }
            cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        } catch (MQExceptionBase& mb) {
            cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
            Sleep(2000);
#else
            usleep(2000 * 1000);
#endif
        }

    } while(true);
}                                          

C#

using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;

namespace Aliyun.MQ.Sample
{
    public class ConsumerSample
    {
        // Specify the HTTP endpoint. 
        private const string _endpoint = "${HTTP_ENDPOINT}";
        // The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section. 
        private const string _accessKeyId = "${ACCESS_KEY}";
        // The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section. 
        private const string _secretAccessKey = "${SECRET_KEY}";
        // The topic to which the message belongs. 
        private const string _topicName = "${TOPIC}";
        // The ID of the instance to which the topic belongs. Default value: null. 
        private const string _instanceId = "${INSTANCE_ID}";
        // The ID of the group that you created in the ApsaraMQ forRocketMQ console. A group ID is also known as a consumer ID. 
        private const string _groupId = "${GROUP_ID}";

        private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
        static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);

        static void Main(string[] args)
        {
            // Cyclically consume messages in the current thread. We recommend that you use multiple threads to concurrently consume messages. 
            while (true)
            {
                try
                {
                    // Consume messages in long polling mode. 
                    // In long polling mode, if no message is available for consumption in the topic, the request is hung on the server for 3 seconds. If messages are available for consumption within the duration, a response is immediately sent to the client. 
                    List<Message> messages = null;

                    try
                    {
                        messages = consumer.ConsumeMessage(
                            3, //  The maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16. 
                            3 // The duration of a long polling cycle. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30. 
                        );
                    }
                    catch (Exception exp1)
                    {
                        if (exp1 is MessageNotExistException)
                        {
                            Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId);
                            continue;
                        }
                        Console.WriteLine(exp1);
                        Thread.Sleep(2000);
                    }

                    if (messages == null)
                    {
                        continue;
                    }

                    List<string> handlers = new List<string>();
                    Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");
                    // The message consumption logic. 
                    foreach (Message message in messages)
                    {
                        Console.WriteLine(message);
                        Console.WriteLine("Property a is:" + message.GetProperty("a"));
                        handlers.Add(message.ReceiptHandle);
                    }
                    // If a broker fails to receive an ACK for a message from a consumer before the period of time specified by the Message.nextConsumeTime parameter elapses, the message is consumed again. 
                    // A unique timestamp is specified for the handle of a message each time the message is consumed. 
                    try
                    {
                        consumer.AckMessage(handlers);
                        Console.WriteLine("Ack message success:");
                        foreach (string handle in handlers)
                        {
                            Console.Write("\t" + handle);
                        }
                        Console.WriteLine();
                    }
                    catch (Exception exp2)
                    {
                        // If the handles of some messages time out, the broker fails to receive ACKs for the messages from consumers. 
                        if (exp2 is AckMessageException)
                        {
                            AckMessageException ackExp = (AckMessageException)exp2;
                            Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);
                            foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
                            {
                                Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage);
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                    Thread.Sleep(2000);
                }
            }
        }
    }
}                                     

What to do next

You can query the message and its trace to verify whether the message is consumed. For more information, see Query messages and Query message traces.