本文以C Link SDK中的Demo文件./demos/mqtt_v5_basic_demo.c
为例,介绍如何调用C Link SDK的API,将MQTT 5.0协议的设备接入物联网平台并进行消息收发。
背景信息
MQTT 5.0接入的更多信息,请参见概述。
步骤一:初始化
添加头文件。
#include "aiot_state_api.h" #include "aiot_sysdep_api.h" #include "aiot_mqtt_api.h"
配置底层依赖和日志输出。
aiot_sysdep_set_portfile(&g_aiot_sysdep_portfile); aiot_state_set_logcb(demo_state_logcb);
调用aiot_mqtt_init,创建MQTT客户端实例,并初始化默认参数。
mqtt_handle = aiot_mqtt_init(); if (mqtt_handle == NULL) { printf("aiot_mqtt_init failed\n"); return -1; }
步骤二:配置功能
调用aiot_mqtt_setopt,配置以下功能。更多功能的配置项,请参见aiot_mqtt_option_t。
配置连接参数。
示例代码:
char *product_key = "a18wP******"; char *device_name = "LightSwitch"; char *device_secret = "uwMTmVAMnGGHaAkqmeDY6cHxxB******"; char *mqtt_host = "iot-06z00ax1o******.mqtt.iothub.aliyuncs.com"; ... ... /* 配置MQTT协议的版本 */ protocol_version = AIOT_MQTT_VERSION_5_0; aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_VERSION, (void *)&protocol_version); /* 配置MQTT服务器地址。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_HOST, (void *)url); /* 配置MQTT服务器端口。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PORT, (void *)&port); /* 配置设备ProductKey。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_PRODUCT_KEY, (void *)product_key); /* 配置设备DeviceName。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_NAME, (void *)device_name); /* 配置设备DeviceSecret。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_DEVICE_SECRET, (void *)device_secret); /* 配置网络连接的安全凭据。 */ aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_NETWORK_CRED, (void *)&cred); /* 如果要使用MQTT 5.0的assigned clientId功能, 需要将use_assigned_clientid置为1 */ uint8_t use_assigned_clientid = 0; aiot_mqtt_setopt(mqtt_handle, AIOT_MQTTOPT_ASSIGNED_CLIENTID, (void *)(&use_assigned_clientid));
相关参数:
参数
示例
说明
mqtt_host
iot-06z00ax1o******.mqtt.iothub.aliyuncs.com
设备接入域名。
企业版实例和新版公共实例:在实例详情页面的开发配置面板,查看接入域名。
旧版公共实例:接入域名格式为
${YourProductKey}.iot-as-mqtt.${YourRegionId}.aliyuncs.com
。
新旧版公共实例和企业版实例、以及接入域名的更多信息,请参见查看实例终端节点。
product_key
a18wP******
设备认证信息。更多信息,请参见获取设备认证信息。
本例程的身份认证方式为一机一密。
device_name
LightSwitch
device_secret
uwMTmVAMnGGHaAkqmeDY6cHxxB******
protocol_version
AIOT_MQTT_VERSION_5_0
配置MQTT协议的版本为5.0。
use_assigned_clientid
1
使用MQTT 5.0的
assigned clientId
功能时,需要将use_assigned_clientid
置为1。MQTT保活说明:
重要设备端在保活时间间隔内,至少需要发送一次报文,包括ping请求。
从物联网平台发送CONNACK响应CONNECT消息时,开始心跳计时。收到PUBLISH、SUBSCRIBE、PING或PUBACK消息时,会重置计时器。物联网平台每隔30秒定时检测一次设备的保活心跳,设备上线时间点距离最新定时检测时间点的时间,是定时检测的等待时间。定义最大超时时间为:
保活心跳时间*1.5+定时检测的等待时间
。超过最大超时时间未收到设备消息,服务器会自动断开连接。
C Link SDK具备保活能力,您可以设置以下配置项,自定义设备连接的保活心跳。如果不配置,则取默认值。
配置项
默认值
说明
AIOT_MQTTOPT_HEARTBEAT_MAX_LOST
2
可容忍的心跳丢失阈值。即:心跳请求报文达到设置的次数后,发起重连。
AIOT_MQTTOPT_HEARTBEAT_INTERVAL_MS
25,000
每次发起重连之间的间隔时间。单位毫秒, 取值范围:1,000~1,200,000。
AIOT_MQTTOPT_KEEPALIVE_SEC
1,200
可容忍的心跳丢失时间阈值。即:失去心跳后,设置的时间内,允许发起重连。单位秒,取值范围:30~1,200。建议取值大于300。
配置状态监控和消息回调。
步骤三:请求连接
调用aiot_mqtt_connect_v5,根据配置连接的参数,向物联网平台,发起连接认证请求。
示例代码中添加的用户属性(MQTT_PROP_ID_USER_PROPERTY
)更多信息,请参见mqtt_property_identify_t。
mqtt_properties_t *conn_props = aiot_mqtt_props_init();
mqtt_property_t user_prop = {
.id = MQTT_PROP_ID_USER_PROPERTY,
.value.str_pair.key.len = strlen("demo_key"),
.value.str_pair.key.value = (uint8_t *)"demo_key",
.value.str_pair.value.len = strlen("demo_value"),
.value.str_pair.value.value = (uint8_t *)"demo_value",
};
aiot_mqtt_props_add(conn_props, &user_prop);
/* 通过MQTT 5.0的方式与服务器建连 */
res = aiot_mqtt_connect_v5(mqtt_handle, NULL, conn_props);
aiot_mqtt_props_deinit(&conn_props);
if (res < STATE_SUCCESS) {
/* 尝试建立连接失败, 销毁MQTT实例, 回收资源 */
aiot_mqtt_deinit(&mqtt_handle);
printf("aiot_mqtt_connect failed: -0x%04X\n\r\n", -res);
printf("please check variables like mqtt_host, produt_key, device_name, device_secret in demo\r\n");
return -1;
}
步骤四:开启保活线程
调用aiot_mqtt_process,向服务器发送心跳报文,使设备保持长连接状态,并重发QoS=1
的未应答报文。
开启保活线程。
res = pthread_create(&g_mqtt_process_thread, NULL, demo_mqtt_process_thread, mqtt_handle); if (res < 0) { printf("pthread_create demo_mqtt_process_thread failed: %d\n", res); return -1; }
设置保活线程处理函数。
void *demo_mqtt_process_thread(void *args) { int32_t res = STATE_SUCCESS; while (g_mqtt_process_thread_running) { res = aiot_mqtt_process(args); if (res == STATE_USER_INPUT_EXEC_DISABLED) { break; } sleep(1); } return NULL; }
步骤五:开启接收线程
调用aiot_mqtt_recv,收取服务器下发的MQTT消息,根据消息回调函数,执行对应处理。在断线时自动重连,根据事件回调函数,执行对应处理。
开启接收线程。
res = pthread_create(&g_mqtt_recv_thread, NULL, demo_mqtt_recv_thread, mqtt_handle); if (res < 0) { printf("pthread_create demo_mqtt_recv_thread failed: %d\n", res); return -1; }
设置接收线程处理函数。
void *demo_mqtt_recv_thread(void *args) { int32_t res = STATE_SUCCESS; while (g_mqtt_recv_thread_running) { res = aiot_mqtt_recv(args); if (res < STATE_SUCCESS) { if (res == STATE_USER_INPUT_EXEC_DISABLED) { break; } sleep(1); } } return NULL; }
步骤六:订阅Topic
调用aiot_mqtt_sub_v5,订阅指定Topic。
示例代码:
/* MQTT 订阅topic功能示例, 请根据自己的业务需求进行使用 */ { char *sub_topic = "/sys/${YourProductKey}/${YourDeviceName}/thing/event/property/post_reply"; mqtt_properties_t *sub_props = aiot_mqtt_props_init(); aiot_mqtt_props_add(sub_props, &user_prop); /* 订阅选项 */ sub_options_t opts = { .no_local = 1, .qos = 1, .retain_as_publish = 1, .retain_handling = 1, }; res = aiot_mqtt_sub_v5(mqtt_handle, sub_topic, &opts, NULL, NULL, sub_props); aiot_mqtt_props_deinit(&sub_props); if (res < 0) { printf("aiot_mqtt_sub failed, res: -0x%04X\n", -res); aiot_mqtt_deinit(&mqtt_handle); return -1; } }
说明示例代码中添加的用户属性(
MQTT_PROP_ID_USER_PROPERTY
)更多信息,请参见mqtt_property_identify_t。添加订阅选项的详细内容,请参见sub_options_t。
相关参数:
参数
示例
说明
sub_topic
/a18wP******/LightSwitch/user/get
拥有订阅权限的Topic。其中:
a18wP******
为设备的ProductKey。LightSwitch
为设备的DeviceName。
本示例为默认的自定义Topic。
设备通过该Topic,可接收物联网平台的消息。
关于Topic的更多信息,请参见什么是Topic。
sub_props
aiot_mqtt_props_init()
订阅附加属性。
opts
.no_local = 1,
.qos = 1,
.retain_as_publish = 1,
.retain_handling = 1,
订阅选项。
步骤七:发送消息
调用aiot_mqtt_pub_v5,向指定Topic发送消息。
示例代码:
mqtt_properties_t *pub_props = aiot_mqtt_props_init(); /* MQTT 发布消息功能示例, 请根据自己的业务需求进行使用 */ char *pub_topic = "/sys/${YourProductKey}/${YourDeviceName}/thing/event/property/post"; char *pub_payload = "{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":0}}"; mqtt_property_t response_prop = { .id = MQTT_PROP_ID_RESPONSE_TOPIC, .value.str.len = strlen(pub_topic), .value.str.value = (uint8_t *)pub_topic, }; char *demo_data_str = "12345"; mqtt_property_t correlation_prop = { .id = MQTT_PROP_ID_CORRELATION_DATA, .value.str.len = strlen(demo_data_str), .value.str.value = (uint8_t *)demo_data_str, }; aiot_mqtt_props_add(pub_props, &response_prop); aiot_mqtt_props_add(pub_props, &correlation_prop); res = aiot_mqtt_pub_v5(mqtt_handle, pub_topic, (uint8_t *)pub_payload, (uint32_t)(strlen(pub_payload)), 1,0, pub_props); if (res < 0) { printf("aiot_mqtt pub failed, res: -0x%04X\n", -res); aiot_mqtt_deinit(&mqtt_handle); return -1; }
说明示例代码中添加的用户属性(
MQTT_PROP_ID_USER_PROPERTY
)更多信息,请参见mqtt_property_identify_t。相关参数:
参数
示例
说明
pub_topic
/a18wP******/LightSwitch/user/update
拥有发布权限的Topic。其中:
a18wP******
为设备的ProductKey。LightSwitch
为设备的DeviceName。
设备通过该Topic向物联网平台发送消息。
关于Topic的更多信息,请参见什么是Topic。
pub_payload
{\"id\":\"1\",\"version\":\"1.0\",\"params\":{\"LightSwitch\":0}}
上报至物联网平台的消息内容。
由于示例消息的Topic类型为自定义,因此数据格式可自定义。
关于数据格式的更多信息,请参见数据格式。
pub_props
参考示例代码。
发布消息携带的属性。
步骤八:断开连接
调用aiot_mqtt_disconnect_v5,向物联网平台发送断开连接的报文,然后断开网络连接。
示例代码中添加的用户属性(MQTT_PROP_ID_USER_PROPERTY
)更多信息,请参见mqtt_property_identify_t。
MQTT接入常应用于长连接的设备,程序通常不会运行至此。例程的主线程任务为配置参数并成功建立连接。连接建立后,主线程可进入休眠。
{
mqtt_properties_t *disconn_props = aiot_mqtt_props_init();
/* reason code 0x0 表示Normal disconnection */
int demo_reason_code = 0x0;
char *demo_reason_string = "normal_exit";
mqtt_property_t reason_prop = {.id = MQTT_PROP_ID_REASON_STRING, .value.str.len = strlen(demo_reason_string), .value.str.value = (uint8_t *)demo_reason_string};
aiot_mqtt_props_add(disconn_props, &reason_prop);
res = aiot_mqtt_disconnect_v5(mqtt_handle, demo_reason_code, disconn_props);
aiot_mqtt_props_deinit(&disconn_props);
}
步骤九:退出程序
调用aiot_mqtt_deinit,销毁MQTT客户端实例,释放资源。
res = aiot_mqtt_deinit(&mqtt_handle);
if (res < STATE_SUCCESS) {
printf("aiot_mqtt_deinit failed: -0x%04X\n", -res);
return -1;
}