This topic describes how to use the SDK for Go to connect an Advanced Message Queuing Protocol (AMQP) client to Alibaba Cloud IoT Platform and receive messages from IoT Platform by using the server-side subscription feature.
Prerequisites
The ID of the consumer group that subscribes to the messages of a topic is obtained.
You can use the default consumer group named DEFAULT_GROUP or create a consumer group in the IoT Platform console. For more information, see Manage consumer groups.
You can use a consumer group to subscribe to the messages of a topic. For more information, see Configure an AMQP server-side subscription.
Development environment
In this example, Go 1.12.7 is used.
Download the SDK
You can run the following command to import the AMQP SDK for Go:
import "pack.ag/amqp"
For more information about how to use the SDK, see package amqp.
Sample code
package main
import (
"os"
"context"
"crypto/hmac"
"crypto/sha1"
"encoding/base64"
"fmt"
"pack.ag/amqp"
"time"
)
// For more information about the parameters, see the "Connect an AMQP client to IoT Platform" topic.
const consumerGroupId = "${YourConsumerGroupId}"
const clientId = "${YourClientId}"
// iotInstanceId: The ID of the IoT Platform instance.
const iotInstanceId = "${YourIotInstanceId}"
// The endpoint. For more information, see the "Connect an AMQP client to IoT Platform" topic.
const host = "${YourHost}"
func main() {
// If you hard-code the AccessKey pair in the project code, the AccessKey pair may be disclosed if the project code is leaked. In this case, the resources within your account become insecure. The following sample code provides an example on how to obtain the AccessKey pair from environment variables. This example is for reference only.
accessKey := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
accessSecret := os.Getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
address := "amqps://" + host + ":5671"
timestamp := time.Now().Nanosecond() / 1000000
// The structure of the userName parameter. For more information, see the "Connect an AMQP client to IoT Platform" topic.
userName := fmt.Sprintf("%s|authMode=aksign,signMethod=Hmacsha1,consumerGroupId=%s,authId=%s,iotInstanceId=%s,timestamp=%d|",
clientId, consumerGroupId, accessKey, iotInstanceId, timestamp)
stringToSign := fmt.Sprintf("authId=%s×tamp=%d", accessKey, timestamp)
hmacKey := hmac.New(sha1.New, []byte(accessSecret))
hmacKey.Write([]byte(stringToSign))
// Calculate a signature. For more information about how to construct the password, see the "Connect an AMQP client to IoT Platform" topic.
password := base64.StdEncoding.EncodeToString(hmacKey.Sum(nil))
amqpManager := &AmqpManager{
address:address,
userName:userName,
password:password,
}
// If you need to enable or disable the message receiving feature, you can create a context by using the Background() function.
ctx := context.Background()
amqpManager.startReceiveMessage(ctx)
}
// The function that implements your business logic. The function is a user-defined function that is asynchronously called. Before you configure this function, we recommend that you consider the consumption of system resources.
func (am *AmqpManager) processMessage(message *amqp.Message) {
fmt.Println("data received:", string(message.GetData()), " properties:", message.ApplicationProperties)
}
type AmqpManager struct {
address string
userName string
password string
client *amqp.Client
session *amqp.Session
receiver *amqp.Receiver
}
func (am *AmqpManager) startReceiveMessage(ctx context.Context) {
childCtx, _ := context.WithCancel(ctx)
err := am.generateReceiverWithRetry(childCtx)
if nil != err {
return
}
defer func() {
am.receiver.Close(childCtx)
am.session.Close(childCtx)
am.client.Close()
}()
for {
// Block message receiving. If ctx is the new context that is created based on the Background() function, message receiving is not blocked.
message, err := am.receiver.Receive(ctx)
if nil == err {
go am.processMessage(message)
message.Accept()
} else {
fmt.Println("amqp receive data error:", err)
// If message receiving is manually disabled, exit the program.
select {
case <- childCtx.Done(): return
default:
}
// If message receiving is not manually disabled, re-establish the connection.
err := am.generateReceiverWithRetry(childCtx)
if nil != err {
return
}
}
}
}
func (am *AmqpManager) generateReceiverWithRetry(ctx context.Context) error {
// Retry with exponential backoff, from 10 ms to 20s.
duration := 10 * time.Millisecond
maxDuration := 20000 * time.Millisecond
times := 1
// If exceptions occur, retry with exponential backoff.
for {
select {
case <- ctx.Done(): return amqp.ErrConnClosed
default:
}
err := am.generateReceiver()
if nil != err {
time.Sleep(duration)
if duration < maxDuration {
duration *= 2
}
fmt.Println("amqp connect retry,times:", times, ",duration:", duration)
times ++
} else {
fmt.Println("amqp connect init success")
return nil
}
}
}
// The states of the connection and session cannot be determined because the packets are unavailable. Re-establish the connection to obtain the information.
func (am *AmqpManager) generateReceiver() error {
if am.session != nil {
receiver, err := am.session.NewReceiver(
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(20),
)
// If a network disconnection error occurs, the connection is closed and the session fails to be established. Otherwise, the connection is established.
if err == nil {
am.receiver = receiver
return nil
}
}
// Delete the previous connection.
if am.client != nil {
am.client.Close()
}
client, err := amqp.Dial(am.address, amqp.ConnSASLPlain(am.userName, am.password), )
if err != nil {
return err
}
am.client = client
session, err := client.NewSession()
if err != nil {
return err
}
am.session = session
receiver, err := am.session.NewReceiver(
amqp.LinkSourceAddress("/queue-name"),
amqp.LinkCredit(20),
)
if err != nil {
return err
}
am.receiver = receiver
return nil
}
You can configure the parameters in the preceding code based on the parameter description in the following table. For more information about other parameters, see Connect an AMQP client to IoT Platform.
Make sure that you specify valid parameter values. Otherwise, the AMQP client fails to connect to IoT Platform.
Parameter | Description |
accessKey | Log on to the IoT Platform console, move the pointer over the profile picture in the upper-right corner, and then click AccessKey Management to obtain the AccessKey ID and AccessKey secret. Note If you use a Resource Access Management (RAM) user, you must attach the AliyunIOTFullAccess policy to the RAM user. This policy allows the RAM user to manage IoT Platform resources. Otherwise, the connection to IoT Platform fails. For more information, see Access IoT Platform as a RAM user. |
accessSecret | |
consumerGroupId | The ID of the consumer group of the IoT Platform instance. To view the ID of the consumer group, perform the following steps: Log on to the IoT Platform console and click the card of the instance that you want to manage. In the left-side navigation pane, choose . The ID of the consumer group is displayed on the Consumer Groups tab. |
iotInstanceId | The ID of the IoT Platform instance. You can view the instance ID on the Overview tab in the IoT Platform console.
|
clientId | The ID of the client. You must specify a custom ID. The ID must be 1 to 64 characters in length. We recommend that you use a unique identifier as the client ID, such as the UUID, MAC address, or IP address of the server on which the client runs. After the AMQP client is connected to IoT Platform and started, perform the following steps to view the details of the client: Log on to the IoT Platform console and click the card of the instance that you want to manage. In the left-side navigation pane, choose . On the Consumer Groups tab, find the consumer group that you want to manage and click View in the Actions column. The ID of each client is displayed on the Consumer Group Status tab. You can use client IDs to identify clients with ease. |
host | The endpoint that the AMQP client uses to connect to IoT Platform. For more information about the endpoint that you can specify for the |
Sample results
If information similar to the following output is displayed, the AMQP client is connected to IoT Platform and can receive messages.
If information similar to the following output is displayed, the AMQP client fails to connect to IoT Platform.
You can check the code or network environment based on logs, solve the problem, and then run the code again.
References
For more information about the error codes that are related to the server-side subscription feature, see the Error codes that are related to messages section of the "IoT Platform logs" topic.