すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RocketMQ:通常のメッセージの送受信

最終更新日:Jul 09, 2024

通常のメッセージは、ApsaraMQ for RocketMQによって提供される機能のないメッセージです。 通常のメッセージは、スケジュールメッセージ、遅延メッセージ、順序付けられたメッセージ、およびトランザクションメッセージを含む、特徴的なメッセージとは異なる。 このトピックでは、Go用のHTTPクライアントSDKを使用して通常のメッセージを送受信する方法に関するサンプルコードを提供します。

の前提条件

開始する前に、次の操作が実行されていることを確認してください。

  • Go用のSDKをインストールします。 詳細については、「環境の準備」をご参照ください。

  • ApsaraMQ for RocketMQコンソールのコードで指定するリソースを作成します。 リソースには、インスタンス、トピック、および消費者グループが含まれます。 詳細については、「リソースの作成」 をご参照ください。

  • Alibaba CloudアカウントのAccessKeyペアを取得します。 詳細については、「AccessKey の作成」をご参照ください。

通常のメッセージを送信する

次のサンプルコードは、Go用のHTTPクライアントSDKを使用して通常のメッセージを送信する方法の例を示しています。

package main

import (
    "fmt"
    "time"
    "strconv"
    "os"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    // The HTTP endpoint. You can obtain the endpoint in the HTTP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
    endpoint := "${HTTP_ENDPOINT}"
    // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    // The AccessKey ID that is used for authentication. 
    accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    // The AccessKey secret that is used for authentication. 
    secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    // The topic in which the message is produced. You must create the topic in the ApsaraMQ for RocketMQ console. 
    topic := "${TOPIC}"
    // The ID of the instance to which the topic belongs. You must create the instance in the ApsaraMQ for RocketMQ console. 
    // If the instance has a namespace, specify the ID of the instance. If the instance does not have a namespace, set the instanceID parameter to null or an empty string. You can obtain the namespace of the instance on the Instance Details page in the ApsaraMQ for RocketMQ console. 
    instanceId := "${INSTANCE_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

    mqProducer := client.GetProducer(instanceId, topic)
    // Cyclically send four messages. 
    for i := 0; i < 4; i++ {
        var msg mq_http_sdk.PublishMessageRequest

        msg = mq_http_sdk.PublishMessageRequest{
        MessageBody: "hello mq!",         // The message content. 
        MessageTag:  "",                  // The message tag. 
        Properties:  map[string]string{}, // The message attributes. 
        }
        // The message key. 
        msg.MessageKey = "MessageKey"
        // The custom attributes of the message. 
        msg.Properties["a"] = strconv.Itoa(i)

        ret, err := mqProducer.PublishMessage(msg)

        if err != nil {
            fmt.Println(err)
            return
        } else {
            fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
        }
        time.Sleep(time.Duration(100) * time.Millisecond)
    }
}

通常のメッセージを購読する

次のサンプルコードは、Go用のHTTPクライアントSDKを使用して通常のメッセージをサブスクライブする方法の例を示しています。

package main

import (
    "fmt"
    "github.com/gogap/errors"
    "strings"
    "time"
    "os"

    "github.com/aliyunmq/mq-http-go-sdk"
)

func main() {
    // The HTTP endpoint. You can obtain the endpoint in the HTTP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console. 
    endpoint := "${HTTP_ENDPOINT}"
    // Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured. 
    // The AccessKey ID that is used for authentication. 
    accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
    // The AccessKey secret that is used for authentication. 
    secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
    // The topic in which the message is produced. You must create the topic in the ApsaraMQ for RocketMQ console. 
    // Each topic can be used to send and receive messages of a specific type. For example, a topic that is used to send and receive normal messages cannot be used to send or receive messages of other types. 
    topic := "${TOPIC}"
    // The ID of the instance to which the topic belongs. You must create the instance in the ApsaraMQ for RocketMQ console. 
    // If the instance has a namespace, specify the ID of the instance. If the instance does not have a namespace, set the instanceID parameter to null or an empty string. You can obtain the namespace of the instance on the Instance Details page in the ApsaraMQ for RocketMQ console. 
    instanceId := "${INSTANCE_ID}"
    // The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console. 
    groupId := "${GROUP_ID}"

    client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")

    mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")

    for {
        endChan := make(chan int)
        respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
        errChan := make(chan error)
        go func() {
            select {
            case resp := <-respChan:
                {
                    // The message consumption logic. 
                    var handles []string
                    fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
                    for _, v := range resp.Messages {
                        handles = append(handles, v.ReceiptHandle)
                        fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
                            "\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
                            "\tBody: %s\n"+
                            "\tProps: %s\n",
                            v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
                            v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
                    }

                    // If the broker fails to receive an acknowledgment (ACK) for a message from the consumer before the period of time specified by the NextConsumeTime parameter elapses, the broker delivers the message in the partition to the consumer again. 
                    // A unique timestamp is specified for the handle of a message each time the message is consumed. 
                    ackerr := mqConsumer.AckMessage(handles)
                    if ackerr != nil {
                        // If the handle of the message times out, the broker fails to receive an ACK for the message from the consumer. 
                        fmt.Println(ackerr)
                        if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
                           for _, errAckItem := range errAckItems {
                             fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
                               errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
                           }
                        } else {
                           fmt.Println("ack err =", ackerr)
                        }
                        time.Sleep(time.Duration(3) * time.Second)
                    } else {
                        fmt.Printf("Ack ---->\n\t%s\n", handles)
                    }

                    endChan <- 1
                }
            case err := <-errChan:
                {
                    // No message is available for consumption in the topic. 
                    if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
                        fmt.Println("\nNo new message, continue!")
                    } else {
                        fmt.Println(err)
                        time.Sleep(time.Duration(3) * time.Second)
                    }
                    endChan <- 1
                }
            case <-time.After(35 * time.Second):
                {
                    fmt.Println("Timeout of consumer message ??")
                    endChan <- 1
                }
            }
        }()

        // Consume messages in long polling mode. The default network timeout period is 35 seconds. 
        // In long polling mode, if no message in the topic is available for consumption, the request is suspended on the broker for the specified period of time. If a message becomes available for consumption within the specified period of time, the broker immediately sends a response to the consumer. In this example, the value is specified as 3 seconds. 
        mqConsumer.ConsumeMessage(respChan, errChan,
            3, // The maximum number of messages that can be consumed at a time. In this example, the value is specified as 3. The maximum value that you can specify is 16. 
            3, // The duration of a long polling period. Unit: seconds. In this example, the value is specified as 3. The maximum value that you can specify is 30. 
        )
        <-endChan
    }
}