coreMQTT 演示(无 TLS)
coreMQTT 是一个 MIT 授权的开源 MQTT C 库,适用于基于微控制器和小型微处理器的 IoT 设备。
注意:我们建议在任何物联网( IoT )应用程序中
使用相互身份验证。此页面上的演示在引入加密和身份验证之前演示了 MQTT 通信,
仅用于教育目的。不适用于生产。
单线程 VS 多线程
coreMQTT 有两种使用模式,单线程和多线程(多任务)。
在多线程应用程序中,仅在一个线程上使用 MQTT 库(如本页记录的演示那样)
等同于单线程用例。单线程用例要求应用程序编写者
对 MQTT 库进行重复的显式调用。多线程用例可以
在后台的代理(或守护进程)任务中执行 MQTT 协议。在代理任务中执行 MQTT 协议
使应用程序编写者无需显式托管任何 MQTT 状态或调用
MQTT_ProcessLoop()
API 函数。使用代理任务还能让多个应用任务共享单个 MQTT 连接,
而无需使用互斥锁等同步原语。
演示简介
共有三个示例项目介绍 “TLS 简介”
页面上描述的概念,此示例项目是其中之一。第一个示例 (本页)演示了未加密的 MQTT 通信。应用程序
第二个示例在第一个示例的基础上引入服务器身份验证
(其中 IoT 客户端对其连接的 MQTT 服务器进行验证)。第三个
示例在第二个示例的基础上引入强力相互身份验证(MQTT 服务器也会对其所连接的 IoT
客户端进行身份验证)。
该系列中的第一个项目仅演示了基本 MQTT 用例,即如何连接到 MQTT 代理
和 MQTT 在 QoS 0 等级的订阅-发布工作流程。在其订阅了一个单一的
主题过滤器后,它会向该主题发布消息,然后等待从服务器上接收同一消息。这种向代理发布消息,
然后又从代理那里接收同一消息的循环会无限重复。由于它使用 QoS 0,
它没有实现发布消息的任何重传机制。
此演示未创建安全连接,因此不适合在生产中使用,
请勿在未加密的网络连接上发送任何机密信息。此演示的确演示了如何
在连接失败的情况下,使用指数退避时间(包括定时抖动)进行连接。指数级增加连接尝试之间的时间间隔,
并加入一些随机的时间抖动,对于大规模 IoT 设备机群而言是最佳实践,因为它可以防止所有 IoT 设备
在同时断开连接时尝试同时重新连接。
此 MQTT 基础演示项目使用 FreeRTOS
Windows 移植,从而可以在 Windows 上使用 Visual Studio 免费社区版构建和评估它,无需任何特定的 MCU 硬件。
源代码组织
演示项目的名称是 mqtt_plain_text_demo.sln,可以在 FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Plain_Text 目录中找到
位于主 FreeRTOS 下载中,(也可在下载页面链接的 Github 中找到)。
配置演示项目
此演示使用 FreeRTOS-Plus-TCP TCP/IP 堆栈,因此请按照
为 TCP/IP 入门项目提供的说明进行操作,
以确保您:
- 安装了必要的
组件(如 WinPCap)。
- 设置了一个静态或动态
IP 地址、网关地址和网络掩码(可选)。
- 设置了 MAC 地址(可选)。
- 在您的主机上选择
以太网网络接口。
- 重要步骤:
在尝试运行 MQTT 演示之前,请测试您的网络连接。
每个演示项目都有自己的配置设置。当你按照网络配置说明进行操作时,
确保应用 MQTT 演示项目中的设置,而不是
TCP/IP 入门项目中的设置。默认情况下,TCP/IP 堆栈被配置为使用动态 IP 地址。
配置 MQTT 代理连接
备选方案 1:使用公共托管的 Mosquitto MQTT 代理(web 托管)
该演示项目可在 "test.mosqitto.org" 中与 Mosquitto 的公共托管消息代理进行通信。如果
演示连接到具有 DHCP 服务和互联网接入的网络,则此操作应有效。请注意,FreeRTOS Windows
端口仅适用于有线以太网网络适配器,该适配器可以是虚拟以太网适配器。您应该使用一个单独的 MQTT
客户端(如 MQTT.fx)来测试
从您的主机到公共 MQTT 代理的 MQTT 连接。要使用托管的 Mosquitto 服务器:
- 打开
/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Plain_Text/demo_config.h
的本地副本
- 增加以下行:
- #define democonfigMQTT_BROKER_ENDPOINT "test.mosquitto.org"
- #define democonfigMQTT_BROKER_PORT ( 1883 )
注意:Mosquitto 是一个开源 MQTT 消息代理,支持 MQTT 5.0、3.1.1 和 3.1 版本。它
是 Eclipse 基金会的一部分,是一个 Eclipse
IoT 项目。test.mosquitto.org MQTT 代理不隶属于 FreeRTOS,也不由其维护,
可能随时无法使用。
备选方案 2:使用本地托管的 Mosquitto MQTT 消息代理(主机)
Mosquitto 代理也可以在本地运行,无论是在您的主机上(用于构建演示应用程序的机器),还是在
您本地网络的另一台计算机上。请按以下步骤操作:
- 请按照
https://mosquitto.org/download/上的说明在本地下载和安装 Mosquitto。
- 打开位于 Mosquitto 安装目录下的 "mosquitto.conf",将 “bind_address”
设置为 Mosquitto 将在您的系统上监听连接的网络。
- 找到您主机的 IP 地址(在 Windows 上运行
ipconfig
命令,或运行 ifconfig
命令
——在 Linux 或MAC OS上)。请注意,FreeRTOS Windows 移植仅适用于有线以太网适配器
(可以是虚拟以太网适配器)。
- 打开
FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Plain_Text/demo_config.h
。
- 添加以下行,将 democonfigMQTT_BROKER_ENDPOINT 设置为运行 Mosquitto 的机器的 IP 地址,
该 IP 地址必须与演示所连接的网络位于同一子网:
#define democonfigMQTT_BROKER_ENDPOINT "w.x.y.z"
#define democonfigMQTT_BROKER_PORT ( 1883 )
应使用单独的 MQTT 客户端(如 MQTT.fx)
来测试从您的主机到已安装 MQTT 代理的 MQTT 连接。
注意:端口号 1883 是未加密 MQTT 的默认端口号。 如果您无法使用该端口
(例如,如果它被您的 IT 安全策略阻止),请把 Mosquitto 使用的端口更改为更高的端口号
(例如,50000 到 55000 范围内的端口号),并相应地设置 mqttexampleMQTT_BROKER_PORT
。
要使用 MQTT.fx 测试 MQTT 连接,请先使用上述链接下载,然后编辑连接配置文件。在
点击“连接”后,确保灯泡符号变为绿色。
备选方案 3:您选择的任何其他未加密 MQTT 代理:
支持未加密 TCP/IP 通信的任何 MQTT 代理也可与此演示一起使用。请按以下步骤操作:
- 打开
/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Plain_Text/demo_config.h
的本地副本
- 添加下列行,并设置您所选择的代理:
- #define democonfigMQTT_BROKER_ENDPOINT "your-desired-endpoint"
- #define democonfigMQTT_BROKER_PORT ( 1883 )
构建演示项目
此演示项目使用的是
Visual Studio 免费社区版。要构建演示,请执行如下操作:
- 从 Visual Studio IDE 中打开
/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Plain_Text/mqt_plain_text_demo.sln
Visual Studio 解决方案文件
- 在 IDE 的 ‘build’ 菜单中选择 ‘build solution’
方案 2 的故障排除:使用本地 Mosquitto 代理
无法连接到服务器:FreeRTOS_Connect 失败。
- 在 mosquitto.conf 中添加 “listener 1883 0.0.0.0” 并重启 mosquitto
连接被拒绝:未授权。
- 在 mosquitto.conf 中添加 “allow_anonymous true” 并重启 mosquitto
功能
该演示创建了一个单个应用程序任务,该任务通过一系列示例循环,演示如何连接到
代理、订阅代理上的主题、在代理上发布主题以及再次断开与代理的连接
。演示应用程序订阅一个主题,并向同一个主题发布。因此,每次
当演示向 MQTT 代理发布消息时,代理会向演示应用程序发送相同的消息。该
演示的结构体如下:
static void prvMQTTDemoTask( void * pvParameters )
{
uint32_t ulPublishCount = 0U, ulTopicCount = 0U;
const uint32_t ulMaxPublishCount = 5UL;
NetworkContext_t xNetworkContext = { 0 };
MQTTContext_t xMQTTContext;
MQTTStatus_t xMQTTStatus;
PlaintextTransportStatus_t xNetworkStatus;
for( ; ; )
{
xNetworkStatus = prvConnectToServerWithBackoffRetries( &xNetworkContext );
configASSERT( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS );
prvCreateMQTTConnectionWithBroker( &xMQTTContext, &xNetworkContext );
prvMQTTSubscribeWithBackoffRetries( &xMQTTContext );
for( ulPublishCount = 0; ulPublishCount < ulMaxPublishCount; ulPublishCount++ )
{
prvMQTTPublishToTopic( &xMQTTContext );
xMQTTStatus = MQTT_ProcessLoop(
&xMQTTContext,
mqttexamplePROCESS_LOOP_TIMEOUT_MS );
configASSERT( xMQTTStatus == MQTTSuccess );
vTaskDelay( mqttexampleDELAY_BETWEEN_PUBLISHES );
}
prvMQTTUnsubscribeFromTopic( &xMQTTContext );
xMQTTStatus = MQTT_ProcessLoop( &xMQTTContext,
mqttexamplePROCESS_LOOP_TIMEOUT_MS );
configASSERT( xMQTTStatus == MQTTSuccess );
xMQTTStatus = MQTT_Disconnect( &xMQTTContext );
xNetworkStatus = Plaintext_FreeRTOS_Disconnect( &xNetworkContext );
configASSERT( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS );
vTaskDelay( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS );
}
}
连接到 MQTT 代理
在上述函数中, prvConnectToServerWithBackoffRetries()
试图与 MQTT 代理建立 TCP 连接
。如果连接失败,则在超时后重试。超时值将呈指数增长,
并包括一些随机抖动,直到达到最大的尝试次数或达到最大的超时值
。生产设备中使用这种类型的退避,可确保同时断开连接的 IoT 设备群不会同时尝试重新连接,
以免服务器不堪重负。如果连接成功,
则在 xNetworkContext< 中返回连接的 TCP 套接字。
函数 prvCreateMQTTConnectionWithBroker()
演示了如何使用
清除会话与 MQTT 代理建立未加密连接。它使用 FreeRTOS-Plus-TCP
传输接口,该接口在
FreeRTOS-Plus/Source/Application-Protocols/platform/freertos/transport/src/plaintext_freertos.c
文件中实现。
prvCreateMQTTConnectionWithBroker()
的定义如下所示。在
xConnectInfo
中设置代理的存活秒数。
以下函数显示了如何使用 MQTT_Init() 在 MQTT 上下文中设置 FreeRTOS-Plus-TCP 传输接口和时间函数,
还显示了如何设置事件回调函数指针 (prvEventCallback
)。此回调用于报告
传入消息。
static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext,
NetworkContext_t * pxNetworkContext )
{
MQTTStatus_t xResult;
MQTTConnectInfo_t xConnectInfo;
bool xSessionPresent;
TransportInterface_t xTransport;
xTransport.pNetworkContext = pxNetworkContext;
xTransport.send = Plaintext_FreeRTOS_send;
xTransport.recv = Plaintext_FreeRTOS_recv;
xResult = MQTT_Init(
pxMQTTContext,
&xTransport,
prvGetTimeMs,
prvEventCallback,
&xBuffer );
configASSERT( xResult == MQTTSuccess );
( void ) memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) );
xConnectInfo.cleanSession = true;
xConnectInfo.pClientIdentifier = democonfigCLIENT_IDENTIFIER;
xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen(
democonfigCLIENT_IDENTIFIER );
xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS;
xResult = MQTT_Connect(
pxMQTTContext,
&xConnectInfo,
NULL,
mqttexampleCONNACK_RECV_TIMEOUT_MS,
&xSessionPresent );
configASSERT( xResult == MQTTSuccess );
}
订阅 MQTT 主题
函数 prvMQTTSubscribeWithBackoffRetries()
演示了如何订阅 MQTT 代理上的主题过滤器
。该示例演示了如何订阅一个主题过滤器,但也可以在同一个 API 调用中传递一个主题过滤器列表,
以订阅一个以上的主题过滤器。此外,如果MQTT
代理拒绝了订阅请求,则订阅将重试 MAX_RETRY_ATTEMPTS 次。该
函数的定义如下:
static const char *const pcExampleTopic = "/example/topic";
static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext )
{
MQTTStatus_t xResult = MQTTSuccess;
RetryUtilsStatus_t xRetryUtilsStatus = RetryUtilsSuccess;
RetryUtilsParams_t xRetryParams;
MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ];
bool xFailedSubscribeToTopic = false;
( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) );
usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext );
xMQTTSubscription[ 0 ].qos = MQTTQoS0;
xMQTTSubscription[ 0 ].pTopicFilter = pcExampleTopic;
xMQTTSubscription[ 0 ].topicFilterLength = strlen( pcExampleTopic );
RetryUtils_ParamsReset( &xRetryParams );
xRetryParams.maxRetryAttempts = MAX_RETRY_ATTEMPTS;
do
{
xResult = MQTT_Subscribe(
pxMQTTContext,
xMQTTSubscription,
1,
usSubscribePacketIdentifier );
configASSERT( xResult == MQTTSuccess );
xResult = MQTT_ProcessLoop( pxMQTTContext, mqttexamplePROCESS_LOOP_TIMEOUT_MS );
configASSERT( xResult == MQTTSuccess );
xFailedSubscribeToTopic = false;
if( xTopicFilterContext.xSubAckStatus == MQTTSubAckFailure )
{
xFailedSubscribeToTopic = true;
xRetryUtilsStatus = RetryUtils_BackoffAndSleep( &xRetryParams );
break;
}
configASSERT( xRetryUtilsStatus != RetryUtilsRetriesExhausted );
} while( ( xFailedSubscribeToTopic == true ) &&
( xRetryUtilsStatus == RetryUtilsSuccess ) );
}
发布到主题
函数 prvMQTTPublishToTopic()
演示了如何在 MQTT 代理上发布主题过滤器。
该函数的定义如下:
static const char *const pcExampleTopic = "/example/topic";
static void prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext )
{
MQTTStatus_t xResult;
MQTTPublishInfo_t xMQTTPublishInfo;
( void ) memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof(
xMQTTPublishInfo ) );
xMQTTPublishInfo.qos = MQTTQoS0;
xMQTTPublishInfo.retain = false;
xMQTTPublishInfo.pTopicName = pcExampleTopic;
xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( pcExampleTopic );
xMQTTPublishInfo.pPayload = mqttexampleMESSAGE;
xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE );
xResult = MQTT_Publish(
pxMQTTContext,
&xMQTTPublishInfo, 0U );
configASSERT( xResult == MQTTSuccess );
}
接收传入消息
如前所述,应用程序在连接到代理之前注册事件回调函数。函数
prvMQTTDemoTask()
通过调用 MQTT_ProcessLoop()
来接收传入消息。当接收到传入的 MQTT 消息时,
它会调用应用程序注册的事件回调函数。函数
prvEventCallback()
是这种事件回调函数的示例;它检查传入的数据包类型,
并调用适当的处理程序。在此示例中,函数要么调用 prvMQTTProcessIncomingPublish()
来
处理传入的发布消息,要么调用 prvMQTTProcessResponse()
来处理确认。请注意,有一个单独的演示
展示了如何以线程安全的方式使用 coreMQTT——在这种情况下,MQTT 协议在后台运行,
无需调用 MQTT_ProcessLoop()。
static void prvEventCallback( MQTTContext_t * pxMQTTContext,
MQTTPacketInfo_t * pxPacketInfo,
MQTTDeserializedInfo_t * pxDeserializedInfo )
{
( void ) pxMQTTContext;
if( ( pxPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH )
{
prvMQTTProcessIncomingPublish( pxDeserializedInfo->pPublishInfo );
}
else
{
prvMQTTProcessResponse( pxPacketInfo, pxDeserializedInfo->packetIdentifier );
}
}
处理传入的 MQTT 发布数据包
函数 prvMQTTProcessIncomingPublish()
演示了如何处理来自 MQTT 代理的 PUBLISH 数据包
。 prvMQTTProcessResonse()
演示了如何处理确认数据包。这些函数
的定义如下:
static const char *const pcExampleTopic = "/example/topic";
static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo )
{
if( ( pxPublishInfo->topicNameLength == strlen( pcExampleTopic ) ) &&
( 0 == strcmp( pcExampleTopic, pxPublishInfo->pTopicName ) ) )
{
LogInfo( ( "Incoming Publish Topic Name: %.*s matches subscribed topic.\r\n"
"Incoming Publish Message : %.*s",
pxPublishInfo->topicNameLength,
pxPublishInfo->pTopicName,
pxPublishInfo->payloadLength,
pxPublishInfo->pPayload ) );
}
else
{
LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.",
pxPublishInfo->topicNameLength,
pxPublishInfo->pTopicName ) );
}
}
static void prvMQTTProcessResponse( MQTTPacketInfo_t * pxIncomingPacket,
uint16_t usPacketId )
{
MQTTStatus_t xResult = MQTTSuccess;
uint8_t * pucPayload = NULL;
size_t ulSize = 0;
switch( pxIncomingPacket->type )
{
case MQTT_PACKET_TYPE_SUBACK:
xResult = MQTT_GetSubAckStatusCodes(
pxIncomingPacket,
&pucPayload,
&ulSize );
configASSERT( xResult == MQTTSuccess );
xTopicFilterContext.xSubAckStatus = *pucPayload;
configASSERT( usSubscribePacketIdentifier == usPacketId );
break;
case MQTT_PACKET_TYPE_UNSUBACK:
LogInfo( ( "Unsubscribed from the topic %s.", mqttexampleTOPIC ) );
configASSERT( usUnsubscribePacketIdentifier == usPacketId );
break;
case MQTT_PACKET_TYPE_PINGRESP:
LogWarn( ( "PINGRESP should not be handled by the application "
"callback when using MQTT_ProcessLoop.\n" ) );
break;
default:
LogWarn( ( "prvMQTTProcessResponse() called with unknown packet type:(%02X).",
pxIncomingPacket->type ) );
}
}
取消订阅主题
工作流中的最后一步是取消订阅主题,因此代理不再
从 pcExampleTopic
发送任何发布。该函数的定义如下:
static const char *const pcExampleTopic = "/example/topic";
static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext )
{
MQTTStatus_t xResult;
MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ];
( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof(
xMQTTSubscription ) );
xMQTTSubscription[ 0 ].qos = MQTTQoS0;
xMQTTSubscription[ 0 ].pTopicFilter = pcExampleTopic;
xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen(
pcExampleTopic);
usUnsubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext );
xResult = MQTT_Unsubscribe( pxMQTTContext,
xMQTTSubscription,
sizeof( xMQTTSubscription ) / sizeof(
MQTTSubscribeInfo_t ),
usUnsubscribePacketIdentifier );
configASSERT( xResult == MQTTSuccess );
}
Copyright (C) Amazon Web Services, Inc. or its affiliates. All rights reserved.