This article describes how to call the API operations of Link SDK for C to connect MQTT-based devices with IoT Platform and receive messages. In this example, the ./mqtt_basic_demo.c
sample code file is used.
Background information
For more information about MQTT connections, see Overview.
Step 1: Initialize a client
Add header files.
#include "aiot_state_api.h" #include "aiot_sysdep_api.h" #include "aiot_mqtt_api.h"
Add the underlying dependency and configure the log output feature.
aiot_sysdep_set_portfile(&g_aiot_sysdep_portfile); aiot_state_set_logcb(demo_state_logcb);
Call the aiot_mqtt_init operation to create an MQTT client instance and initialize the default parameters.
mqtt_handle = aiot_mqtt_init(); if (mqtt_handle == NULL) { printf("aiot_mqtt_init failed\n"); return -1; }
Step 2: Configure required features
Call the aiot_mqtt_setopt operation to configure the following items:
For more information about the parameters of the operation, see aiot_mqtt_option_t.
Set connection parameters.
Sample code
char *product_key = "a18wP******"; char *device_name = "LightSwitch"; char *device_secret = "uwMTmVAMnGGHaAkqmeDY6cHxxB******"; char *mqtt_host = "iot-06z00ax1o******.mqtt.iothub.aliyuncs.com"; ... ... /* Set the endpoint of the MQTT broker. */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_HOST, (void *)url); /* Set the port of the MQTT broker. */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PORT, (void *)&port); /* Set the ProductKey of the device. */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PRODUCT_KEY, (void *)product_key); /* Set the DeviceName of the device. */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_NAME, (void *)device_name); /* Set the DeviceSecret of the device. */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_SECRET, (void *)device_secret); /* Set the security credential of the connection. */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred);
Parameters:
Parameter
Example
Description
mqtt_host
iot-06z00ax1o******.mqtt.iothub.aliyuncs.com
The endpoint.
If you use an Enterprise Edition instance, or a public instance that is activated on July 30, 2021 or later, view the endpoint on the Instance Details page. Click View Development Configurations in the upper-right corner. On the Development Configurations panel, the endpoint is displayed.
If you use a public instance that is activated before July 30, 2021,the endpoint is
${YourProductKey}.iot-as-mqtt.${YourRegionId}.aliyuncs.com
.
product_key
a18wP******
The device authentication information. For more information, see Obtain device authentication information.
In this example, the unique-certificate-per-device authentication method is used.
device_name
LightSwitch
device_secret
uwMTmVAMnGGHaAkqmeDY6cHxxB******
MQTT keep-alive description:
ImportantDuring a keep-alive period, the device must send at least one message, including ping requests.
A timer starts when IoT Platform sends a CONNACK message as a response to a CONNECT message. When IoT Platform receives a PUBLISH, SUBSCRIBE, PING, or PUBACK message, the timer is reset. IoT Platform checks the heartbeat of the MQTT connection every 30 seconds. The wait time for the heartbeat check is the period between the point in time at which a device connects to IoT Platform and the point in time at which the next heartbeat check is performed. The maximum timeout period is calculated by using the following formula:
Heartbeat interval × 1.5 + Wait time for the heartbeat check
. If the server receives no messages from the device within the maximum timeout period, the server ends the connection with the device.
Link SDK for C provides the keep-alive capability. You can specify the following parameters to customize the heartbeat of a connection. Otherwise, the default values are used.
Parameter
Default value
Description
AIOT_MQTTOPT_HEARTBEAT_MAX_LOST
2
The maximum number of heartbeats that can be lost. If the limit is exceeded, a connection is re-established.
AIOT_MQTTOPT_HEARTBEAT_INTERVAL_MS
25,000
The heartbeat interval. Unit: milliseconds. Valid values: 1,000 to 1,200,000.
AIOT_MQTTOPT_KEEPALIVE_SEC
1,200
The keep-alive time. After the heartbeat is lost, you can re-establish a connection within the specified period. Valid values: 30 to 1,200. Unit: seconds. We recommend that you set a value that is greater than 300.
Configure callbacks to monitor status and receive messages.
Specify the callbacks to monitor status.
Sample code
int main(int argc, char *argv[]) { ... ... /* Specify the default callback to receive MQTT messages. */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_RECV_HANDLER, (void *)demo_mqtt_default_recv_handler); /* Specify the callback to handle MQTT events. */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_EVENT_HANDLER, (void *)demo_mqtt_event_handler); ... ... }
Parameters:
Parameter
Example
Description
AIOT_MQTTOPT_RECV_HANDLER
demo_mqtt_default_recv_handler
When a message is received, the callback is called to perform the required operations.
AIOT_MQTTOPT_EVENT_HANDLER
demo_mqtt_event_handler
When the status of device connection changes, the callback is called to perform the required operations.
Define the callback to monitor status.
ImportantDo not specify time-consuming logic to handle events. Otherwise, threads that are used to receive packets may be blocked.
Connection status changes include network exceptions, automatic reconnections, and disconnections.
To handle connection status changes, you can modify the code as needed in the
TODO
section.
/* The callback to handle MQTT events. When the connection is created, recovered, or closed, the callback is called. For information about the event definition, see core/aiot_mqtt_api.h. */ void demo_mqtt_event_handler(void *handle, const aiot_mqtt_event_t *event, void *userdata) { switch (event->type) { /* Call the aiot_mqtt_connect operation to establish a connection with the MQTT broker. */ case AIOT_MQTTEVT_CONNECT: { printf("AIOT_MQTTEVT_CONNECT\n"); /* TODO: Specify the processing logic after the connection is established. Do not call time-consuming functions that may block threads. */ } break; /* If a disconnection error occurs due to a network exception, the SDK automatically initiates a request to reconnect with the MQTT broker. */ case AIOT_MQTTEVT_RECONNECT: { printf("AIOT_MQTTEVT_RECONNECT\n"); /* TODO: Specify the processing logic after the connection is re-established. Do not call time-consuming functions that may block threads. */ } break; /* A disconnection error occurs due to a network exception. The underlying read or write operation fails. A heartbeat response is not obtained from the MQTT broker. */ case AIOT_MQTTEVT_DISCONNECT: { char *cause = (event->data.disconnect == AIOT_MQTTDISCONNEVT_NETWORK_DISCONNECT) ? ("network disconnect") : ("heartbeat disconnect"); printf("AIOT_MQTTEVT_DISCONNECT: %s\n", cause); /* TODO: Specify the logic to process disconnection issues. Do not call time-consuming functions that may block threads. */ } break; default: { } } }
Define the callback to receive messages.
ImportantDo not specify time-consuming logic to process messages. Otherwise, threads that are used to receive packets may be blocked.
To process received messages, you can modify the code as needed in the
TODO
section.
/* The default callback to process MQTT messages. When the SDK receives messages from the MQTT broker and you do not configure callbacks, the following operation is called. */ void demo_mqtt_default_recv_handler(void *handle, const aiot_mqtt_recv_t *packet, void *userdata) { switch (packet->type) { case AIOT_MQTTRECV_HEARTBEAT_RESPONSE: { printf("heartbeat response\n"); /* TODO: Specify the logic to process heartbeat responses from the MQTT broker. In most cases, this logic is not required. */ } break; case AIOT_MQTTRECV_SUB_ACK: { printf("suback, res: -0x%04X, packet id: %d, max qos: %d\n", -packet->data.sub_ack.res, packet->data.sub_ack.packet_id, packet->data.sub_ack.max_qos); /* TODO: Specify the logic to process the responses of the MQTT broker to subscription requests. In most cases, this logic is not required. */ } break; case AIOT_MQTTRECV_PUB: { printf("pub, qos: %d, topic: %.*s\n", packet->data.pub.qos, packet->data.pub.topic_len, packet->data.pub.topic); printf("pub, payload: %.*s\n", packet->data.pub.payload_len, packet->data.pub.payload); /* TODO: Specify the logic to process the messages that are sent from the MQTT broker. */ } break; case AIOT_MQTTRECV_PUB_ACK: { printf("puback, packet id: %d\n", packet->data.pub_ack.packet_id); /* TODO: Specify the logic to process the responses of the MQTT broker to QoS 1 messages. In most cases, this logic is not required. */ } break; default: { } } }
Step 3: Establish a connection
Call the aiot_mqtt_connect operation to send a connection and authentication request to IoT Platform. For information about how to specify the parameters, see Set connection parameters.
/* Establish an MQTT connection with IoT Platform. */
res = aiot_mqtt_connect(mqtt_handle);
if (res < STATE_SUCCESS) {
/* Release resources of the MQTT instance if the MQTT connection fails to be established. */
aiot_mqtt_deinit(&mqtt_handle);
printf("aiot_mqtt_connect failed: -0x%04X\n", -res);
return -1;
}
Step 4: Enable the keep-alive thread
Call the aiot_mqtt_process operation to send a heartbeat message to the MQTT broker and re-send the QoS 1
messages for which no responses are generated. This way, a persistent connection is enabled.
Enable the keep-alive thread.
res = pthread_create(&g_mqtt_process_thread, NULL, demo_mqtt_process_thread, mqtt_handle); if (res < 0) { printf("pthread_create demo_mqtt_process_thread failed: %d\n", res); return -1; }
Configure the function to manage the keep-alive thread.
void *demo_mqtt_process_thread(void *args) { int32_t res = STATE_SUCCESS; while (g_mqtt_process_thread_running) { res = aiot_mqtt_process(args); if (res == STATE_USER_INPUT_EXEC_DISABLED) { break; } sleep(1); } return NULL; }
Step 5: Enable the thread to receive messages
Call the aiot_mqtt_recv operation to receive MQTT messages from the broker. The required operations are performed by using the callback to receive messages. If a disconnection and then an automatic reconnection occur, the required operations are performed by using the callback to handle events.
Enable the thread to receive messages.
res = pthread_create(&g_mqtt_recv_thread, NULL, demo_mqtt_recv_thread, mqtt_handle); if (res < 0) { printf("pthread_create demo_mqtt_recv_thread failed: %d\n", res); return -1; }
Configure the function to manage the thread.
void *demo_mqtt_recv_thread(void *args) { int32_t res = STATE_SUCCESS; while (g_mqtt_recv_thread_running) { res = aiot_mqtt_recv(args); if (res < STATE_SUCCESS) { if (res == STATE_USER_INPUT_EXEC_DISABLED) { break; } sleep(1); } } return NULL; }
Step 6: Subscribe to a topic
Call the aiot_mqtt_sub operation to subscribe to a specified topic.
Sample code
{ char *sub_topic = "/a18wP******/LightSwitch/user/get"; res = aiot_mqtt_sub(mqtt_handle, sub_topic, NULL, 1, NULL); if (res < 0) { printf("aiot_mqtt_sub failed, res: -0x%04X\n", -res); return -1; } }
NoteAfter you configure the sample code, delete the annotation symbols on both sides of the code.
Parameters:
Parameter
Example
Description
sub_topic
/a18wP******/LightSwitch/user/get
The topic that has the Subscribe permission.
a18wP******
indicates the ProductKey of the device.LightSwitch
indicates the DeviceName of the device.
In this example, the default custom topic is used.
The device can receive messages from IoT Platform by using this topic.
For more information about topics, see Topics.
Step 7: Send a message
Call the aiot_mqtt_pub operation to send a message to a specified topic.
Sample code
{ char *pub_topic = "/a18wP******/LightSwitch/user/update"; char *pub_payload = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":0}}"; res = aiot_mqtt_pub(mqtt_handle, pub_topic, (uint8_t *)pub_payload, (uint32_t)strlen(pub_payload), 0); if (res < 0) { printf("aiot_mqtt_sub failed, res: -0x%04X\n", -res); return -1; } }
NoteAfter you configure the sample code, delete the annotation symbols on both sides of the code.
Parameters:
Parameter
Example
Description
pub_topic
/a18wP******/LightSwitch/user/update
The topic that has the Publish permission.
a18wP******
indicates the ProductKey of the device.LightSwitch
indicates the DeviceName of the device.
The device can send messages to IoT Platform by using this topic.
For more information about topics, see Topics.
pub_payload
{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":0}}
The message that is sent to IoT Platform.
In this example, a custom topic is used. Therefore, you can customize the message format.
For more information about message formats, see Data formats.
After an MQTT connection between the device and IoT Platform is established, make sure that the number of messages does not exceed the threshold.
For more information about limits, see Connections and communications the Limits topic in the IoT Platform documentation.
If the number of messages exceeds the threshold, log on to the IoT Platform console to view accumulated messages. For more information, see View and monitor consumer groups.
Step 8: Disconnect the device from IoT Platform
MQTT connections are applied to devices that remain persistently connected. You can manually disconnect devices from IoT Platform.
In this example, the main thread is used to set parameters and establish a connection. After the connection is established, you can put the main thread to sleep.
Call the aiot_mqtt_disconnect operation to disconnect the device from IoT Platform.
res = aiot_mqtt_disconnect(mqtt_handle);
if (res < STATE_SUCCESS) {
aiot_mqtt_deinit(&mqtt_handle);
printf("aiot_mqtt_disconnect failed: -0x%04X\n", -res);
return -1;
}
Step 9: Exit the program
Call aiot_mqtt_deinit operation to destroy the MQTT client instance and release resources.
res = aiot_mqtt_deinit(&mqtt_handle);
if (res < STATE_SUCCESS) {
printf("aiot_mqtt_deinit failed: -0x%04X\n", -res);
return -1;
}
What to do next
After you configure the sample code file, compile the file to generate an executable file. In this example, the
./output/mqtt-basic-demo
executable file is generated.For more information, see Compilation and running.
For more information about running results, see View logs.