通常のメッセージは、ApsaraMQ for RocketMQによって提供される機能のないメッセージです。 通常のメッセージは、スケジュールメッセージ、遅延メッセージ、順序付けられたメッセージ、およびトランザクションメッセージを含む、特徴的なメッセージとは異なる。 このトピックでは、Go用のHTTPクライアントSDKを使用して通常のメッセージを送受信する方法に関するサンプルコードを提供します。
の前提条件
開始する前に、次の操作が実行されていることを確認してください。
Go用のSDKをインストールします。 詳細については、「環境の準備」をご参照ください。
ApsaraMQ for RocketMQコンソールのコードで指定するリソースを作成します。 リソースには、インスタンス、トピック、および消費者グループが含まれます。 詳細については、「リソースの作成」 をご参照ください。
Alibaba CloudアカウントのAccessKeyペアを取得します。 詳細については、「AccessKey の作成」をご参照ください。
通常のメッセージを送信する
次のサンプルコードは、Go用のHTTPクライアントSDKを使用して通常のメッセージを送信する方法の例を示しています。
package main
import (
"fmt"
"time"
"strconv"
"os"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
// The HTTP endpoint. You can obtain the endpoint in the HTTP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
endpoint := "${HTTP_ENDPOINT}"
// Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
// The AccessKey ID that is used for authentication.
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
// The AccessKey secret that is used for authentication.
secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
// The topic in which the message is produced. You must create the topic in the ApsaraMQ for RocketMQ console.
topic := "${TOPIC}"
// The ID of the instance to which the topic belongs. You must create the instance in the ApsaraMQ for RocketMQ console.
// If the instance has a namespace, specify the ID of the instance. If the instance does not have a namespace, set the instanceID parameter to null or an empty string. You can obtain the namespace of the instance on the Instance Details page in the ApsaraMQ for RocketMQ console.
instanceId := "${INSTANCE_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqProducer := client.GetProducer(instanceId, topic)
// Cyclically send four messages.
for i := 0; i < 4; i++ {
var msg mq_http_sdk.PublishMessageRequest
msg = mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq!", // The message content.
MessageTag: "", // The message tag.
Properties: map[string]string{}, // The message attributes.
}
// The message key.
msg.MessageKey = "MessageKey"
// The custom attributes of the message.
msg.Properties["a"] = strconv.Itoa(i)
ret, err := mqProducer.PublishMessage(msg)
if err != nil {
fmt.Println(err)
return
} else {
fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
}
time.Sleep(time.Duration(100) * time.Millisecond)
}
}
通常のメッセージを購読する
次のサンプルコードは、Go用のHTTPクライアントSDKを使用して通常のメッセージをサブスクライブする方法の例を示しています。
package main
import (
"fmt"
"github.com/gogap/errors"
"strings"
"time"
"os"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
// The HTTP endpoint. You can obtain the endpoint in the HTTP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
endpoint := "${HTTP_ENDPOINT}"
// Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
// The AccessKey ID that is used for authentication.
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
// The AccessKey secret that is used for authentication.
secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
// The topic in which the message is produced. You must create the topic in the ApsaraMQ for RocketMQ console.
// Each topic can be used to send and receive messages of a specific type. For example, a topic that is used to send and receive normal messages cannot be used to send or receive messages of other types.
topic := "${TOPIC}"
// The ID of the instance to which the topic belongs. You must create the instance in the ApsaraMQ for RocketMQ console.
// If the instance has a namespace, specify the ID of the instance. If the instance does not have a namespace, set the instanceID parameter to null or an empty string. You can obtain the namespace of the instance on the Instance Details page in the ApsaraMQ for RocketMQ console.
instanceId := "${INSTANCE_ID}"
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
groupId := "${GROUP_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")
for {
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
// The message consumption logic.
var handles []string
fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
"\tBody: %s\n"+
"\tProps: %s\n",
v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
}
// If the broker fails to receive an acknowledgment (ACK) for a message from the consumer before the period of time specified by the NextConsumeTime parameter elapses, the broker delivers the message in the partition to the consumer again.
// A unique timestamp is specified for the handle of a message each time the message is consumed.
ackerr := mqConsumer.AckMessage(handles)
if ackerr != nil {
// If the handle of the message times out, the broker fails to receive an ACK for the message from the consumer.
fmt.Println(ackerr)
if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
for _, errAckItem := range errAckItems {
fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
}
} else {
fmt.Println("ack err =", ackerr)
}
time.Sleep(time.Duration(3) * time.Second)
} else {
fmt.Printf("Ack ---->\n\t%s\n", handles)
}
endChan <- 1
}
case err := <-errChan:
{
// No message is available for consumption in the topic.
if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
fmt.Println("\nNo new message, continue!")
} else {
fmt.Println(err)
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case <-time.After(35 * time.Second):
{
fmt.Println("Timeout of consumer message ??")
endChan <- 1
}
}
}()
// Consume messages in long polling mode. The default network timeout period is 35 seconds.
// In long polling mode, if no message in the topic is available for consumption, the request is suspended on the broker for the specified period of time. If a message becomes available for consumption within the specified period of time, the broker immediately sends a response to the consumer. In this example, the value is specified as 3 seconds.
mqConsumer.ConsumeMessage(respChan, errChan,
3, // The maximum number of messages that can be consumed at a time. In this example, the value is specified as 3. The maximum value that you can specify is 16.
3, // The duration of a long polling period. Unit: seconds. In this example, the value is specified as 3. The maximum value that you can specify is 30.
)
<-endChan
}
}