ApsaraMQ forRocketMQコンソールで必要なリソースを作成した後、ApsaraMQ for RocketMQ HTTPクライアントSDKを使用して通常のメッセージを送信およびサブスクライブできます。
前提条件
- 説明
提供される例では、通常のメッセージが使用される。 通常のメッセージ用に作成したトピックを使用して、スケジュールされたメッセージ、遅延メッセージ、順序付けられたメッセージ、トランザクションメッセージなど、他の種類のメッセージを送信またはサブスクライブすることはできません。 メッセージのメッセージタイプに基づいてトピックを作成する必要があります。
HTTPクライアントSDKのダウンロードとインストール
ApsaraMQ for RocketMQは、複数のプログラミング言語用の次のHTTPクライアントSDKを提供します。 ビジネス要件に基づいて、特定の言語のクライアントSDKをダウンロードしてインストールします。
Java SDK
PHP SDK
Go SDK
Python SDK
Node.js SDK
C# SDK
C++ SDK
HTTPクライアントSDKを使用して通常のメッセージを送信する
特定のプログラミング言語のクライアントSDKを取得したら、そのプログラミング言語のサンプルコードを実行して通常のメッセージを送信できます。
Java
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;
public class Producer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// Specify the HTTP endpoint.
"${HTTP_ENDPOINT}",
// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section.
"${ACCESS_KEY}",
// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section.
"${SECRET_KEY}"
);
// The topic to which the message belongs.
final String topic = "${TOPIC}";
// The ID of the instance to which the topic belongs. Default value: null.
final String instanceId = "${INSTANCE_ID}";
// Obtain the producer that sends messages to the topic.
MQProducer producer;
if (instanceId != null && instanceId != "") {
producer = mqClient.getProducer(instanceId, topic);
} else {
producer = mqClient.getProducer(topic);
}
try {
// Cyclically send four messages.
for (int i = 0; i < 4; i++) {
TopicMessage pubMsg;
if (i % 2 == 0) {
// The normal message.
pubMsg = new TopicMessage(
// The message content.
"hello mq!".getBytes(),
// The message tag.
"A"
);
// The message attributes.
pubMsg.getProperties().put("a", String.valueOf(i));
// The message key.
pubMsg.setMessageKey("MessageKey");
} else {
pubMsg = new TopicMessage(
// The message content.
"hello mq!".getBytes(),
// The message tag.
"A"
);
// The message attributes.
pubMsg.getProperties().put("a", String.valueOf(i));
// Schedule to send the message 10 seconds later.
pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
}
// Send the message in synchronous mode. If no exception is thrown, the message is sent.
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
// Send the message in synchronous mode. If no exception is thrown, the message is sent.
System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
}
} catch (Throwable e) {
// Specify the logic to resend or persist the message if the message fails to be sent and needs to be sent again.
System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
e.printStackTrace();
}
mqClient.close();
}
}
Go
package main
import (
"fmt"
"time"
"strconv"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
// Specify the HTTP endpoint.
endpoint := "${HTTP_ENDPOINT}"
// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section.
accessKey := "${ACCESS_KEY}"
// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section.
secretKey := "${SECRET_KEY}"
// The topic to which the message belongs.
topic := "${TOPIC}"
// The ID of the instance to which the topic belongs. Default value: null.
instanceId := "${INSTANCE_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqProducer := client.GetProducer(instanceId, topic)
// Cyclically send four messages.
for i := 0; i < 4; i++ {
var msg mq_http_sdk.PublishMessageRequest
if i%2 == 0 {
msg = mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq!", // The message content.
MessageTag: "", // The message tag.
Properties: map[string]string{}, // The message attributes.
}
// The message key.
msg.MessageKey = "MessageKey"
// The message attributes.
msg.Properties["a"] = strconv.Itoa(i)
} else {
msg = mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq timer!", // The message content.
MessageTag: "", // The message tag.
Properties: map[string]string{}, // The message attributes.
}
// The message attributes.
msg.Properties["a"] = strconv.Itoa(i)
// Schedule to send the message 10 seconds later. The value is a UNIX timestamp in milliseconds.
msg.StartDeliverTime = time.Now().UTC().Unix() * 1000 + 10 * 1000
}
ret, err := mqProducer.PublishMessage(msg)
if err != nil {
fmt.Println(err)
return
} else {
fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
}
time.Sleep(time.Duration(100) * time.Millisecond)
}
}
PHP
<?php
require "vendor/autoload.php";
use MQ\Model\TopicMessage;
use MQ\MQClient;
class ProducerTest
{
private $client;
private $producer;
public function __construct()
{
$this->client = new MQClient(
// Specify the HTTP endpoint.
"${HTTP_ENDPOINT}",
// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section.
"${ACCESS_KEY}",
// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section.
"${SECRET_KEY}"
);
// The topic to which the message belongs.
$topic = "${TOPIC}";
// The ID of the instance to which the topic belongs. Default value: null.
$instanceId = "${INSTANCE_ID}";
$this->producer = $this->client->getProducer($instanceId, $topic);
}
public function run()
{
try
{
for ($i=1; $i<=4; $i++)
{
$publishMessage = new TopicMessage(
"xxxxxxxx"// The message content.
);
// The message attributes.
$publishMessage->putProperty("a", $i);
// The message key.
$publishMessage->setMessageKey("MessageKey");
if ($i % 2 == 0) {
// Schedule to send the message 10 seconds later.
$publishMessage->setStartDeliverTime(time() * 1000 + 10 * 1000);
}
$result = $this->producer->publishMessage($publishMessage);
print "Send mq message success. msgId is:" . $result->getMessageId() . ", bodyMD5 is:" . $result->getMessageBodyMD5() . "\n";
}
} catch (\Exception $e) {
print_r($e->getMessage() . "\n");
}
}
}
$instance = new ProducerTest();
$instance->run();
?>
Python
#!/usr/bin/env python
# coding=utf8
import sys
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_producer import *
from mq_http_sdk.mq_client import *
import time
# Initialize the client.
mq_client = MQClient(
# Specify the HTTP endpoint.
"${HTTP_ENDPOINT}",
# The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section.
"${ACCESS_KEY}",
# The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section.
"${SECRET_KEY}"
)
# The topic to which the message belongs.
topic_name = "${TOPIC}"
# The ID of the instance to which the topic belongs. Default value: None.
instance_id = "${INSTANCE_ID}"
producer = mq_client.get_producer(instance_id, topic_name)
# Cyclically send multiple messages.
msg_count = 4
print("%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count))
try:
for i in range(msg_count):
if i % 2 == 0:
msg = TopicMessage(
# The message content.
"I am test message %s.Hello" % i,
# The message tag.
""
)
# The message attributes.
msg.put_property("a", "i")
# The message key.
msg.set_message_key("MessageKey")
re_msg = producer.publish_message(msg)
print("Publish Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
else:
msg = TopicMessage(
# The message content.
"I am test message %s." % i,
# The message tag.
""
)
msg.put_property("a", i)
# Schedule an absolute time in milliseconds to send the message.
msg.set_start_deliver_time(int(round(time.time() * 1000)) + 5 * 1000)
re_msg = producer.publish_message(msg)
print("Publish Timer Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5))
time.sleep(1)
except MQExceptionBase as e:
if e.type == "TopicNotExist":
print("Topic not exist, please create it.")
sys.exit(1)
print("Publish Message Fail. Exception:%s" % e)
Node.js
const {
MQClient,
MessageProperties
} = require('@aliyunmq/mq-http-sdk');
// Specify the HTTP endpoint.
const endpoint = "${HTTP_ENDPOINT}";
// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section.
const accessKeyId = "${ACCESS_KEY}";
// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section.
const accessKeySecret = "${SECRET_KEY}";
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
// The topic to which the message belongs.
const topic = "${TOPIC}";
// The ID of the instance to which the topic belongs. Default value: null.
const instanceId = "${INSTANCE_ID}";
const producer = client.getProducer(instanceId, topic);
(async function(){
try {
// Cyclically send four messages.
for(var i = 0; i < 4; i++) {
let res;
if (i % 2 == 0) {
msgProps = new MessageProperties();
// The message attributes.
msgProps.putProperty("a", i);
// The message key.
msgProps.messageKey("MessageKey");
res = await producer.publishMessage("hello mq.", "", msgProps);
} else {
msgProps = new MessageProperties();
// The message attributes.
msgProps.putProperty("a", i);
// Schedule to send the message 10 seconds later.
msgProps.startDeliverTime(Date.now() + 10 * 1000);
res = await producer.publishMessage("hello mq. timer msg!", "TagA", msgProps);
}
console.log("Publish message: MessageID:%s,BodyMD5:%s", res.body.MessageId, res.body.MessageBodyMD5);
}
} catch(e) {
// Specify the logic to resend or persist the message if the message fails to be sent and needs to be sent again.
console.log(e)
}
})();
C++
//#include <iostream>
#include <fstream>
#include <time.h>
#include "mq_http_sdk/mq_client.h"
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
// Specify the HTTP endpoint.
"${HTTP_ENDPOINT}",
// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section.
"${ACCESS_KEY}",
// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section.
"${SECRET_KEY}"
);
// The topic to which the message belongs.
string topic = "${TOPIC}";
// The ID of the instance to which the topic belongs. Default value: null.
string instanceId = "${INSTANCE_ID}";
MQProducerPtr producer;
if (instanceId == "") {
producer = mqClient.getProducerRef(topic);
} else {
producer = mqClient.getProducerRef(instanceId, topic);
}
try {
for (int i = 0; i < 4; i++)
{
PublishMessageResponse pmResp;
if (i % 4 == 0) {
// publish message, only have body.
producer->publishMessage("Hello, mq!", pmResp);
} else if (i % 4 == 1) {
// publish message, only have body and tag.
producer->publishMessage("Hello, mq!have tag!", "tag", pmResp);
} else if (i % 4 == 2) {
// publish message, have body,tag,properties and key.
TopicMessage pubMsg("Hello, mq!have key!");
pubMsg.putProperty("a",std::to_string(i));
pubMsg.setMessageKey("MessageKey" + std::to_string(i));
producer->publishMessage(pubMsg, pmResp);
} else {
// publish timer message, message will be consumed after StartDeliverTime
TopicMessage pubMsg("Hello, mq!timer msg!", "tag");
// StartDeliverTime is an absolute time in millisecond.
pubMsg.setStartDeliverTime(time(NULL) * 1000 + 10 * 1000);
pubMsg.putProperty("b",std::to_string(i));
pubMsg.putProperty("c",std::to_string(i));
producer->publishMessage(pubMsg, pmResp);
}
cout << "Publish mq message success. Topic is: " << topic
<< ", msgId is:" << pmResp.getMessageId()
<< ", bodyMD5 is:" << pmResp.getMessageBodyMD5() << endl;
}
} catch (MQServerException& me) {
cout << "Request Failed: " + me.GetErrorCode() << ", requestId is:" << me.GetRequestId() << endl;
return -1;
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
return -2;
}
return 0;
}
C#
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ.Util;
namespace Aliyun.MQ.Sample
{
public class ProducerSample
{
// Specify the HTTP endpoint.
private const string _endpoint = "${HTTP_ENDPOINT}";
// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section.
private const string _accessKeyId = "${ACCESS_KEY}";
// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section.
private const string _secretAccessKey = "${SECRET_KEY}";
// The topic to which the message belongs.
private const string _topicName = "${TOPIC}";
// The ID of the instance to which the topic belongs. Default value: null.
private const string _instanceId = "${INSTANCE_ID}";
private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
static MQProducer producer = _client.GetProducer(_instanceId, _topicName);
static void Main(string[] args)
{
try
{
// Cyclically send four messages.
for (int i = 0; i < 4; i++)
{
TopicMessage sendMsg;
if (i % 2 == 0)
{
sendMsg = new TopicMessage("dfadfadfadf");
// The message attributes.
sendMsg.PutProperty("a", i.ToString());
// The message key.
sendMsg.MessageKey = "MessageKey";
}
else
{
sendMsg = new TopicMessage("dfadfadfadf", "tag");
// The message attributes.
sendMsg.PutProperty("a", i.ToString());
// Schedule to send the message 10 seconds later.
sendMsg.StartDeliverTime = AliyunSDKUtils.GetNowTimeStamp() + 10 * 1000;
}
TopicMessage result = producer.PublishMessage(sendMsg);
Console.WriteLine("publis message success:" + result);
}
}
catch (Exception ex)
{
Console.Write(ex);
}
}
}
}
次の手順を実行してインスタンスを起動することもできます。ApsaraMQ for RocketMQ consoleにログインします。作成したインスタンスを見つけて、 操作列の詳細をクリックします。ドロップダウンリストからすぐに体験を選択します。
HTTPクライアントSDKを使用して通常のメッセージを消費する
通常のメッセージが送信されたら、コンシューマーを起動してメッセージを消費する必要があります。 ビジネス要件に基づいた特定のプログラミング言語に対して次のサンプルコードを使用して、コンシューマーを開始し、メッセージ消費機能をテストできます。 次のコードのコメントに基づいてパラメーターを指定します。
Java
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.Message;
import java.util.ArrayList;
import java.util.List;
public class Consumer {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// Specify the HTTP endpoint.
"${HTTP_ENDPOINT}",
// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section.
"${ACCESS_KEY}",
// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section.
"${SECRET_KEY}"
);
// The topic to which the message belongs.
final String topic = "${TOPIC}";
// The ID of the group that you created in the ApsaraMQ forRocketMQ console. A group ID is also known as a consumer ID.
final String groupId = "${GROUP_ID}";
// The ID of the instance to which the topic belongs. Default value: null.
final String instanceId = "${INSTANCE_ID}";
final MQConsumer consumer;
if (instanceId != null && instanceId != "") {
consumer = mqClient.getConsumer(instanceId, topic, groupId, null);
} else {
consumer = mqClient.getConsumer(topic, groupId);
}
// Cyclically consume messages in the current thread. We recommend that you use multiple threads to concurrently consume messages.
do {
List<Message> messages = null;
try {
// Consume messages in long polling mode.
// In long polling mode, if no message is available for consumption in the topic, the request is hung on the server for 3 seconds. If messages are available for consumption within the duration, a response is immediately sent to the client.
messages = consumer.consumeMessage(
3,// The maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16.
3// The duration of a long polling cycle. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30.
);
} catch (Throwable e) {
e.printStackTrace();
try {
Thread.sleep(2000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
// No message.
if (messages == null || messages.isEmpty()) {
System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
continue;
}
// The message consumption logic.
for (Message message : messages) {
System.out.println("Receive message: " + message);
}
// If a broker fails to receive an acknowledgment (ACK) for a message from a consumer before the period of time specified by the Message.nextConsumeTime parameter elapses, the message is consumed again.
// A unique timestamp is specified for the handle of a message each time the message is consumed.
{
List<String> handles = new ArrayList<String>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}
try {
consumer.ackMessage(handles);
} catch (Throwable e) {
// If the handles of some messages time out, the broker fails to receive ACKs for the messages from consumers.
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
if (errors.getErrorMessages() != null) {
for (String errorHandle :errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
}
}
continue;
}
e.printStackTrace();
}
}
} while (true);
}
}
Go
package main
import (
"fmt"
"github.com/gogap/errors"
"strings"
"time"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
// Specify the HTTP endpoint.
endpoint := "${HTTP_ENDPOINT}"
// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section.
accessKey := "${ACCESS_KEY}"
// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section.
secretKey := "${SECRET_KEY}"
// The topic to which the message belongs.
topic := "${TOPIC}"
// The ID of the instance to which the topic belongs. Default value: null.
instanceId := "${INSTANCE_ID}"
// The ID of the group that you created in the ApsaraMQ forRocketMQ console. A group ID is also known as a consumer ID.
groupId := "${GROUP_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")
for {
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
// The message consumption logic.
var handles []string
fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
"\tBody: %s\n"+
"\tProps: %s\n",
v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
}
// If a broker fails to receive an ACK for a message from a consumer before the period of time specified by the NextConsumeTime parameter elapses, the message is consumed again.
// A unique timestamp is specified for the handle of a message each time the message is consumed.
ackerr := mqConsumer.AckMessage(handles)
if ackerr != nil {
// If the handles of some messages time out, the broker fails to receive ACKs for the messages from consumers.
fmt.Println(ackerr)
for _, errAckItem := range ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem) {
fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
}
time.Sleep(time.Duration(3) * time.Second)
} else {
fmt.Printf("Ack ---->\n\t%s\n", handles)
}
endChan <- 1
}
case err := <-errChan:
{
// No message.
if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
fmt.Println("\nNo new message, continue!")
} else {
fmt.Println(err)
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case <-time.After(35 * time.Second):
{
fmt.Println("Timeout of consumer message ??")
endChan <- 1
}
}
}()
// Consume messages in long polling mode.
// In long polling mode, if no message is available for consumption in the topic, the request is hung on the server for 3 seconds. If messages are available for consumption within the duration, a response is immediately sent to the client.
mqConsumer.ConsumeMessage(respChan, errChan,
3, // The maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16.
3, // The duration of a long polling cycle. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30.
)
<-endChan
}
}
PHP
<?php
require "vendor/autoload.php";
use MQ\Model\TopicMessage;
use MQ\MQClient;
class ConsumerTest
{
private $client;
private $consumer;
public function __construct()
{
$this->client = new MQClient(
// Specify the HTTP endpoint.
"${HTTP_ENDPOINT}",
// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section.
"${ACCESS_KEY}",
// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section.
"${SECRET_KEY}"
);
// The topic to which the message belongs.
$topic = "${TOPIC}";
// The ID of the group that you created in the ApsaraMQ forRocketMQ console. A group ID is also known as a consumer ID.
$groupId = "${GROUP_ID}";
// The ID of the instance to which the topic belongs. Default value: null.
$instanceId = "${INSTANCE_ID}";
$this->consumer = $this->client->getConsumer($instanceId, $topic, $groupId);
}
public function run()
{
// Cyclically consume messages in the current thread. We recommend that you use multiple threads to concurrently consume messages.
while (True) {
try {
// Consume messages in long polling mode.
// In long polling mode, if no message is available for consumption in the topic, the request is hung on the server for 3 seconds. If messages are available for consumption within the duration, a response is immediately sent to the client.
$messages = $this->consumer->consumeMessage(
3, // The maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16.
3 // The duration of a long polling cycle. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30.
);
} catch (\Exception $e) {
if ($e instanceof MQ\Exception\MessageNotExistException) {
// If no message is available for consumption in the topic, the long polling mode continues to take effect.
printf("No message, contine long polling!RequestId:%s\n", $e->getRequestId());
continue;
}
print_r($e->getMessage() . "\n");
sleep(3);
continue;
}
print "consume finish, messages:\n";
// The message consumption logic.
$receiptHandles = array();
foreach ($messages as $message) {
$receiptHandles[] = $message->getReceiptHandle();
printf("MessageID:%s TAG:%s BODY:%s \nPublishTime:%d, FirstConsumeTime:%d, \nConsumedTimes:%d, NextConsumeTime:%d,MessageKey:%s\n",
$message->getMessageId(), $message->getMessageTag(), $message->getMessageBody(),
$message->getPublishTime(), $message->getFirstConsumeTime(), $message->getConsumedTimes(), $message->getNextConsumeTime(),
$message->getMessageKey());
print_r($message->getProperties());
}
// If a broker fails to receive an ACK for a message from a consumer before the period of time specified by the $message->getNextConsumeTime() parameter elapses, the message is consumed again.
// A unique timestamp is specified for the handle of a message each time the message is consumed.
print_r($receiptHandles);
try {
$this->consumer->ackMessage($receiptHandles);
} catch (\Exception $e) {
if ($e instanceof MQ\Exception\AckMessageException) {
// If the handles of some messages time out, the broker fails to receive ACKs for the messages from consumers.
printf("Ack Error, RequestId:%s\n", $e->getRequestId());
foreach ($e->getAckMessageErrorItems() as $errorItem) {
printf("\tReceiptHandle:%s, ErrorCode:%s, ErrorMsg:%s\n", $errorItem->getReceiptHandle(), $errorItem->getErrorCode(), $errorItem->getErrorCode());
}
}
}
print "ack finish\n";
}
}
}
$instance = new ConsumerTest();
$instance->run();
?>
Python
#!/usr/bin/env python
# coding=utf8
from mq_http_sdk.mq_exception import MQExceptionBase
from mq_http_sdk.mq_consumer import *
from mq_http_sdk.mq_client import *
# Initialize the client.
mq_client = MQClient(
# Specify the HTTP endpoint.
"",
# The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section.
"${ACCESS_KEY}",
# The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section.
"${SECRET_KEY}"
)
# The topic to which the message belongs.
topic_name = "${TOPIC}"
The ID of the group that you created in the ApsaraMQ forRocketMQ console.
group_id = "GID_test"
# The ID of the instance to which the topic belongs. Default value: None.
instance_id = "MQ_INST_1380156306793859_BbXbx0Y4"
consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
# In long polling mode, if no message is available for consumption in the topic, the request is hung on the server for 3 seconds. If messages are available for consumption within the duration, a response is immediately sent to the client.
# The duration of a long polling cycle. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30.
wait_seconds = 3
# The maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16.
batch = 3
print "%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)
while True:
try:
# Consume messages in long polling mode.
recv_msgs = consumer.consume_message(batch, wait_seconds)
for msg in recv_msgs:
print "Receive, MessageId: %s\nMessageBodyMD5: %s \
\nMessageTag: %s\nConsumedTimes: %s \
\nPublishTime: %s\nBody: %s \
\nNextConsumeTime: %s \
\nReceiptHandle: %s" % \
(msg.message_id, msg.message_body_md5,
msg.message_tag, msg.consumed_times,
msg.publish_time, msg.message_body,
msg.next_consume_time, msg.receipt_handle)
except MQExceptionBase, e:
if e.type == "MessageNotExist":
print "No new message! RequestId: %s" % e.req_id
continue
print "Consume Message Fail! Exception:%s\n" % e
time.sleep(2)
continue
# If a broker fails to receive an ACK for a message from a consumer before the period of time specified by the #msg.next_consume_time parameter elapses, the message is consumed again.
// A unique timestamp is specified for the handle of a message each time the message is consumed.
try:
receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
consumer.ack_message(receipt_handle_list)
print "Ak %s Message Succeed.\n\n" % len(receipt_handle_list)
except MQExceptionBase, e:
print "\nAk Message Fail! Exception:%s" % e
# If the handles of some messages time out, the broker fails to receive ACKs for the messages from consumers.
if e.sub_errors:
for sub_error in e.sub_errors:
print "\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])
Node.js
const {
MQClient
} = require('@aliyunmq/mq-http-sdk');
// Specify the HTTP endpoint.
const endpoint = "${HTTP_ENDPOINT}";
// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section.
const accessKeyId = "${ACCESS_KEY}";
// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section.
const accessKeySecret = "${SECRET_KEY}";
var client = new MQClient(endpoint, accessKeyId, accessKeySecret);
// The topic to which the message belongs.
const topic = "${TOPIC}";
// The ID of the group that you created in the ApsaraMQ forRocketMQ console. A group ID is also known as a consumer ID.
const groupId = "${GROUP_ID}";
// The ID of the instance to which the topic belongs. Default value: null.
const instanceId = "${INSTANCE_ID}";
const consumer = client.getConsumer(instanceId, topic, groupId);
(async function(){
// Cyclically consume messages.
while(true) {
try {
// Consume messages in long polling mode.
// In long polling mode, if no message is available for consumption in the topic, the request is hung on the server for 3 seconds. If messages are available for consumption within the duration, a response is immediately sent to the client.
res = await consumer.consumeMessage(
3, // The maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16.
3 // The duration of a long polling cycle. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30.
);
if (res.code == 200) {
// The message consumption logic.
console.log("Consume Messages, requestId:%s", res.requestId);
const handles = res.body.map((message) => {
console.log("\tMessageId:%s,Tag:%s,PublishTime:%d,NextConsumeTime:%d,FirstConsumeTime:%d,ConsumedTimes:%d,Body:%s" +
",Props:%j,MessageKey:%s,Prop-A:%s",
message.MessageId, message.MessageTag, message.PublishTime, message.NextConsumeTime, message.FirstConsumeTime, message.ConsumedTimes,
message.MessageBody,message.Properties,message.MessageKey,message.Properties.a);
return message.ReceiptHandle;
});
// If a broker fails to receive an ACK for a message from a consumer before the period of time specified by the message.NextConsumeTime parameter elapses, the message is consumed again.
// A unique timestamp is specified for the handle of a message each time the message is consumed.
res = await consumer.ackMessage(handles);
if (res.code != 204) {
// If the handles of some messages time out, the broker fails to receive ACKs for the messages from consumers.
console.log("Ack Message Fail:");
const failHandles = res.body.map((error)=>{
console.log("\tErrorHandle:%s, Code:%s, Reason:%s\n", error.ReceiptHandle, error.ErrorCode, error.ErrorMessage);
return error.ReceiptHandle;
});
handles.forEach((handle)=>{
if (failHandles.indexOf(handle) < 0) {
console.log("\tSucHandle:%s\n", handle);
}
});
} else {
// Obtain an ACK from the consumer.
console.log("Ack Message suc, RequestId:%s\n\t", res.requestId, handles.join(','));
}
}
} catch(e) {
if (e.Code.indexOf("MessageNotExist") > -1) {
// If no message is available for consumption in the topic, the long polling mode continues to take effect.
console.log("Consume Message: no new message, RequestId:%s, Code:%s", e.RequestId, e.Code);
} else {
console.log(e);
}
}
}
})();
C++
#include <vector>
#include <fstream>
#include "mq_http_sdk/mq_client.h"
#ifdef _WIN32
#include <windows.h>
#else
#include <unistd.h>
#endif
using namespace std;
using namespace mq::http::sdk;
int main() {
MQClient mqClient(
// Specify the HTTP endpoint.
"${HTTP_ENDPOINT}",
// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section.
"${ACCESS_KEY}",
// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section.
"${SECRET_KEY}"
);
// The topic to which the message belongs.
string topic = "${TOPIC}";
// The ID of the group that you created in the ApsaraMQ forRocketMQ console. A group ID is also known as a consumer ID.
string groupId = "${GROUP_ID}";
// The ID of the instance to which the topic belongs. Default value: null.
string instanceId = "${INSTANCE_ID}";
MQConsumerPtr consumer;
if (instanceId == "") {
consumer = mqClient.getConsumerRef(topic, groupId);
} else {
consumer = mqClient.getConsumerRef(instanceId, topic, groupId, "");
}
do {
try {
std::vector<Message> messages;
// Consume messages in long polling mode.
// In long polling mode, if no message is available for consumption in the topic, the request is hung on the server for 3 seconds. If messages are available for consumption within the duration, a response is immediately sent to the client.
consumer->consumeMessage(
3,// The maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16.
3,// The duration of a long polling cycle. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30.
messages
);
cout << "Consume: " << messages.size() << " Messages!" << endl;
// The message consumption logic.
std::vector<std::string> receiptHandles;
for (std::vector<Message>::iterator iter = messages.begin();
iter != messages.end(); ++iter)
{
cout << "MessageId: " << iter->getMessageId()
<< " PublishTime: " << iter->getPublishTime()
<< " Tag: " << iter->getMessageTag()
<< " Body: " << iter->getMessageBody()
<< " FirstConsumeTime: " << iter->getFirstConsumeTime()
<< " NextConsumeTime: " << iter->getNextConsumeTime()
<< " ConsumedTimes: " << iter->getConsumedTimes()
<< " Properties: " << iter->getPropertiesAsString()
<< " Key: " << iter->getMessageKey() << endl;
receiptHandles.push_back(iter->getReceiptHandle());
}
// Obtain an ACK from the consumer.
// If a broker fails to receive an ACK for a message from a consumer before the period of time specified by the Message.NextConsumeTime parameter elapses, the message is consumed again.
// A unique timestamp is specified for the handle of a message each time the message is consumed.
AckMessageResponse bdmResp;
consumer->ackMessage(receiptHandles, bdmResp);
if (!bdmResp.isSuccess()) {
// If the handles of some messages time out, the broker fails to receive ACKs for the messages from consumers.
const std::vector<AckMessageFailedItem>& failedItems =
bdmResp.getAckMessageFailedItem();
for (std::vector<AckMessageFailedItem>::const_iterator iter = failedItems.begin();
iter != failedItems.end(); ++iter)
{
cout << "AckFailedItem: " << iter->errorCode
<< " " << iter->receiptHandle << endl;
}
} else {
cout << "Ack: " << messages.size() << " messages suc!" << endl;
}
} catch (MQServerException& me) {
if (me.GetErrorCode() == "MessageNotExist") {
cout << "No message to consume! RequestId: " + me.GetRequestId() << endl;
continue;
}
cout << "Request Failed: " + me.GetErrorCode() + ".RequestId: " + me.GetRequestId() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
} catch (MQExceptionBase& mb) {
cout << "Request Failed: " + mb.ToString() << endl;
#ifdef _WIN32
Sleep(2000);
#else
usleep(2000 * 1000);
#endif
}
} while(true);
}
C#
using System;
using System.Collections.Generic;
using System.Threading;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using Aliyun.MQ;
namespace Aliyun.MQ.Sample
{
public class ConsumerSample
{
// Specify the HTTP endpoint.
private const string _endpoint = "${HTTP_ENDPOINT}";
// The AccessKey ID. An AccessKey ID is used for identity authentication. For information about how to obtain an AccessKey ID, see Create an AccessKey pair in the Prerequisites section.
private const string _accessKeyId = "${ACCESS_KEY}";
// The AccessKey secret. An AccessKey secret is used for identity authentication. For information about how to obtain an AccessKey secret, see Create an AccessKey pair in the Prerequisites section.
private const string _secretAccessKey = "${SECRET_KEY}";
// The topic to which the message belongs.
private const string _topicName = "${TOPIC}";
// The ID of the instance to which the topic belongs. Default value: null.
private const string _instanceId = "${INSTANCE_ID}";
// The ID of the group that you created in the ApsaraMQ forRocketMQ console. A group ID is also known as a consumer ID.
private const string _groupId = "${GROUP_ID}";
private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);
static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);
static void Main(string[] args)
{
// Cyclically consume messages in the current thread. We recommend that you use multiple threads to concurrently consume messages.
while (true)
{
try
{
// Consume messages in long polling mode.
// In long polling mode, if no message is available for consumption in the topic, the request is hung on the server for 3 seconds. If messages are available for consumption within the duration, a response is immediately sent to the client.
List<Message> messages = null;
try
{
messages = consumer.ConsumeMessage(
3, // The maximum number of messages that can be consumed at a time. In this example, the value is set to 3. The maximum value that you can specify is 16.
3 // The duration of a long polling cycle. Unit: seconds. In this example, the value is set to 3. The maximum value that you can specify is 30.
);
}
catch (Exception exp1)
{
if (exp1 is MessageNotExistException)
{
Console.WriteLine(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId);
continue;
}
Console.WriteLine(exp1);
Thread.Sleep(2000);
}
if (messages == null)
{
continue;
}
List<string> handlers = new List<string>();
Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");
// The message consumption logic.
foreach (Message message in messages)
{
Console.WriteLine(message);
Console.WriteLine("Property a is:" + message.GetProperty("a"));
handlers.Add(message.ReceiptHandle);
}
// If a broker fails to receive an ACK for a message from a consumer before the period of time specified by the Message.nextConsumeTime parameter elapses, the message is consumed again.
// A unique timestamp is specified for the handle of a message each time the message is consumed.
try
{
consumer.AckMessage(handlers);
Console.WriteLine("Ack message success:");
foreach (string handle in handlers)
{
Console.Write("\t" + handle);
}
Console.WriteLine();
}
catch (Exception exp2)
{
// If the handles of some messages time out, the broker fails to receive ACKs for the messages from consumers.
if (exp2 is AckMessageException)
{
AckMessageException ackExp = (AckMessageException)exp2;
Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);
foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems)
{
Console.WriteLine("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage);
}
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
Thread.Sleep(2000);
}
}
}
}
}
次に何をするか
メッセージとそのトレースを照会して、メッセージが消費されたかどうかを確認できます。 詳細については、「 Query messages 」と「 Query message traces」をご参照ください。