注文メッセージは、ApsaraMQ for RocketMQによって提供されるメッセージの一種です。 順序付けられたメッセージは、厳密な先入れ先出し (FIFO) の順序で発行および消費されます。 このトピックでは、Go用のHTTPクライアントSDKを使用して順序付きメッセージを送受信する方法に関するサンプルコードを提供します。
背景情報
注文されたメッセージは、次のタイプに分類されます。
グローバルに順序付けられたメッセージ: トピック内のメッセージがこのタイプの場合、メッセージはFIFO順序で発行され、消費されます。
パーティション順メッセージ: トピック内のメッセージがこのタイプの場合、メッセージはシャーディングキーを使用して異なるパーティションに分散されます。 各パーティションのメッセージはFIFO順に消費されます。 シャーディングキーは、パーティションを識別するために順序付けられたメッセージに使用されるキーフィールドです。 シャーディングキーはメッセージキーとは異なります。
詳細については、「注文メッセージ」をご参照ください。
の前提条件
開始する前に、次の操作が実行されていることを確認してください。
Go用のSDKをインストールします。 詳細については、「環境の準備」をご参照ください。
ApsaraMQ for RocketMQコンソールのコードで指定するリソースを作成します。 リソースには、インスタンス、トピック、および消費者グループが含まれます。 詳細については、「リソースの作成」 をご参照ください。
Alibaba CloudアカウントのAccessKeyペアを取得します。 詳細については、「AccessKey の作成」をご参照ください。
順序付けられたメッセージを送信する
ApsaraMQ for RocketMQブローカーは、送信者が単一のプロデューサーまたはスレッドを使用してメッセージを送信する順序に基づいて、メッセージが生成される順序を決定します。 送信者が複数のプロデューサまたはスレッドを使用してメッセージを同時に送信する場合、メッセージの順序は、ApsaraMQ for RocketMQブローカーがメッセージを受信した順序によって決まります。 この注文は、ビジネス側の送信注文とは異なる場合があります。
次のサンプルコードは、Go用のHTTPクライアントSDKを使用して順序付きメッセージを送信する方法の例を示しています。
package main
import (
"fmt"
"time"
"strconv"
"os"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
// The HTTP endpoint. You can obtain the endpoint in the HTTP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
endpoint := "${HTTP_ENDPOINT}"
// Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
// The AccessKey ID that is used for authentication.
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
// The AccessKey secret that is used for authentication.
secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
// The topic in which the message is produced. You must create the topic in the ApsaraMQ for RocketMQ console.
topic := "${TOPIC}"
// The ID of the instance to which the topic belongs. You must create the instance in the ApsaraMQ for RocketMQ console.
// If the instance has a namespace, specify the ID of the instance. If the instance does not have a namespace, set the instanceID parameter to null or an empty string. You can obtain the namespace of the instance on the Instance Details page in the ApsaraMQ for RocketMQ console.
instanceId := "${INSTANCE_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqProducer := client.GetProducer(instanceId, topic)
// Cyclically send eight messages.
for i := 0; i < 8; i++ {
msg := mq_http_sdk.PublishMessageRequest{
MessageBody: "hello mq!", // The message content.
MessageTag: "", // The message tag.
Properties: map[string]string{}, // The message attributes.
}
// The message key.
msg.MessageKey = "MessageKey"
// The custom attributes of the message.
msg.Properties["a"] = strconv.Itoa(i)
// The sharding key that is used to distribute ordered messages to a specific partition. Sharding keys can be used to identify partitions. A sharding key is different from a message key.
msg.ShardingKey = strconv.Itoa(i % 2)
ret, err := mqProducer.PublishMessage(msg)
if err != nil {
fmt.Println(err)
return
} else {
fmt.Printf("Publish ---->\n\tMessageId:%s, BodyMD5:%s, \n", ret.MessageId, ret.MessageBodyMD5)
}
time.Sleep(time.Duration(100) * time.Millisecond)
}
}
注文されたメッセージを購読する
次のサンプルコードは、Go用のHTTPクライアントSDKを使用して順序付きメッセージをサブスクライブする方法の例を示しています。
package main
import (
"fmt"
"github.com/gogap/errors"
"strings"
"time"
"os"
"github.com/aliyunmq/mq-http-go-sdk"
)
func main() {
// The HTTP endpoint. You can obtain the endpoint in the HTTP Endpoint section of the Instance Details page in the ApsaraMQ for RocketMQ console.
endpoint := "${HTTP_ENDPOINT}"
// Make sure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are configured.
// The AccessKey ID that is used for authentication.
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
// The AccessKey secret that is used for authentication.
secretKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
// The topic in which the message is produced. You must create the topic in the ApsaraMQ for RocketMQ console.
topic := "${TOPIC}"
// The ID of the instance to which the topic belongs. You must create the instance in the ApsaraMQ for RocketMQ console.
// If the instance has a namespace, specify the ID of the instance. If the instance does not have a namespace, set the instanceID parameter to null or an empty string. You can obtain the namespace of the instance on the Instance Details page in the ApsaraMQ for RocketMQ console.
instanceId := "${INSTANCE_ID}"
// The ID of the consumer group that you created in the ApsaraMQ for RocketMQ console.
groupId := "${GROUP_ID}"
client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")
for {
endChan := make(chan int)
respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
select {
case resp := <-respChan:
{
// The message consumption logic.
var handles []string
fmt.Printf("Consume %d messages---->\n", len(resp.Messages))
for _, v := range resp.Messages {
handles = append(handles, v.ReceiptHandle)
fmt.Printf("\tMessageID: %s, PublishTime: %d, MessageTag: %s\n"+
"\tConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d\n"+
"\tBody: %s\n"+
"\tProps: %s\n"+
"\tShardingKey: %s\n",
v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties, v.ShardingKey)
}
// If the broker fails to receive an acknowledgment (ACK) for a message from the consumer before the period of time specified by the NextConsumeTime parameter elapses, the broker delivers the message in the partition to the consumer again.
// A unique timestamp is specified for the handle of a message each time the message is consumed.
ackerr := mqConsumer.AckMessage(handles)
if ackerr != nil {
// If the handle of the message times out, the broker fails to receive an ACK for the message from the consumer.
fmt.Println(ackerr)
if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
for _, errAckItem := range errAckItems {
fmt.Printf("\tErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s\n",
errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
}
} else {
fmt.Println("ack err =", ackerr)
}
time.Sleep(time.Duration(3) * time.Second)
} else {
fmt.Printf("Ack ---->\n\t%s\n", handles)
}
endChan <- 1
}
case err := <-errChan:
{
// No message is available for consumption in the topic.
if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
fmt.Println("\nNo new message, continue!")
} else {
fmt.Println(err)
time.Sleep(time.Duration(3) * time.Second)
}
endChan <- 1
}
case <-time.After(35 * time.Second):
{
fmt.Println("Timeout of consumer message ??")
endChan <- 1
}
}
}()
// The consumer may pull partitionally ordered messages from multiple partitions. The consumer consumes the messages in each partition in the order in which the messages are sent.
// Assume that the consumer pulls partitionally ordered messages from one partition. If the broker does not receive an ACK for a message from the consumer, the broker delivers the message to the consumer again.
// The consumer can consume the next batch of messages from a partition only after all messages that are pulled from the partition in the previous batch are acknowledged to be consumed.
// Consume messages in long polling mode. In long polling mode, the network timeout period is 35 seconds.
// In long polling mode, if no message is available for consumption in the topic, requests are suspended on the broker for the specified period of time. If a message becomes available for consumption during the specified period of time, the broker immediately sends a response to the consumer. In this example, the value is specified as 3 seconds.
mqConsumer.ConsumeMessageOrderly(respChan, errChan,
3, // The maximum number of messages that can be consumed at a time. In this example, the value is specified as 3. The maximum value that you can specify is 16.
3, // The duration of a long polling period. Unit: seconds. In this example, the value is specified as 3. The maximum value that you can specify is 30.
)
<-endChan
}
}