coreMQTT 存活演示
注意:我们建议在构建任何物联网 (IoT) 应用程序时始终使用双向身份验证。 此页面上的演示在引入加密和身份验证之前演示了 MQTT 通信,仅用于教育目的, 不适用于生产。
简介
存活 MQTT 演示项目使用
FreeRTOSWindows 端口,因此您
因此您可以用 Windows 上
免费的 Visual Studio 社区版来构建和评估该演示,而无需任何特殊 MCU
硬件。本项目提供了一种替代发送存活数据包的方法,
以便在给定的存活间隔内不发送控制数据包的情况下保持与 MQTT 代理的连接。
存活 MQTT 演示展示了如何建立一个与 MQTT 代理的明文 TCP 连接,
如果连接失败,则采用指数退避逻辑。在建立 TCP 连接后,客户端也会发送一个 MQTT 连接数据包,
其中包括关于所述代理存活间隔的信息。如果代理在
此给定间隔的 1.5 倍时间内未收到控制数据包,代理将关闭连接。为了避免这种情况,使用自动重新加载软件定时器
在间隔到期之前向代理发送 ping 请求。每当执行定时器以发送 ping 请求时,
另一个定时器会被启动,等待来自代理的 ping 响应。
接下来,客户端会订阅单个主题过滤器,然后等待足够长的时间来执行定时器。
之后,客户端以 QoS 1 级别发布到该主题,并反复调用
"MQTT_ReceiveLoop
" 以接收
来自代理的发布确认。向 "MQTT_ReceiveLoop
" 传递一个其值为 0 的超时,以只运行一次迭代,
任务在每次迭代之间延迟。请注意,如果在两次迭代中未收到发布确认,则将执行 ping 请求定时器。
整个周期无限期地重复。
请记住,使用 "MQTT_ProcessLoop
"
代替 "MQTT_ReceiveLoop
" 也能实现相同效果。但是,"MQTT_ProcessLoop
" 需要一个定时器查询函数来返回当前时间(以毫秒为单位)。
下面提供的说明将演示如何连接到在互联网上托管的 Mosquitto 测试代理,
或在主机上本地运行的服务器。
本演示仅作为学习练习使用。本演示未创建
安全连接,但可以轻松修改以使用 TLS 连接。但是,发送的所有 MQTT 消息都是
未加密的明文。请勿从您的 IoT 设备向 MQTT 代理发送任何机密信息。此
MQTT 代理由不隶属于 FreeRTOS 的第三方公开托管。此 MQTT 代理可能在任何时候变得不可用,
并且它不是由 FreeRTOS 维护的。生产 IoT 设备应使用相互身份验证和加密的网络连接,
正如 MQTT TLS
演示 中所展示的那样。
注意:Mosquitto 是一个开源 MQTT 消息代理。更多详细信息
请点击
此处。
源代码组织
用于存活 MQTT 演示的 Visual Studio 解决方案称为
mqtt_keep_alive_demo.sln
,可在
FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Keep_Alive 目录中找到,
而该目录又位于主 FreeRTOS 下载中。
构建演示项目
此演示项目使用
Visual Studio 的社区免费版。要构建演示,请执行如下操作:
- 从 Visual Studio IDE 中打开“
mqtt_keep_alive_demo.sln
” Visual Studio 解决方案文件。
- 在 IDE 的 "Build" 菜单中选择 "Build Solution"。
注意:如果您使用的是 Microsoft Visual Studio 2017 或更早版本,
则必须选择与您的版本兼容的“平台工具集”:“
项目-> RTOS 演示属性-> 平台工具集
”
配置演示项目
演示使用 FreeRTOS-Plus-TCP TCP/IP 堆栈,
因此请遵循为 TCP/IP Starter 项目提供的说明,以确保您:
- 安装了必要的
组件(如 WinPCap)。
- 设置静态或
动态 IP 地址、网关地址和网络掩码(可选)。
- 设置一个 MAC
地址 (可选)。
- 在您的主机上选择
以太网网络接口。
- 重要步骤:
在尝试运行 MQTT 演示之前测试您的网络
连接。
所有这些设置都应在 MQTT LTS rc1 演示项目中更改,而不是在上面链接的页面中所提及的 TCP/IP Starter 项目中
更改!交付时,TCP/IP 栈被配置为使用动态 IP 地址。
配置 MQTT 代理连接
备选方案 1:使用公开托管的 Mosquitto MQTT 代理( web 托管):
该演示项目预先配置为在 "test.mosqitto.org" 中与 Mosquitto 的公开托管消息代理进行通信。
如果演示连接到具有 DHCP 服务和互联网接入的网络,则此操作应有效。请注意,FreeRTOSWindows
端口仅适用于有线以太网网络适配器,该适配器可以是虚拟以太网适配器。
您应使用另一个 MQTT 客户端,如
MQTT.fx,测试从主机到公共 MQTT 代理的 MQTT 连接。
备选方案 2:使用本地托管的 Mosquitto MQTT 消息代理(主机):
Mosquitto 代理也可以在本地运行,无论是在您的主机上(用于构建演示应用程序的机器),还是在
您本地网络的另一台计算机上。请按以下步骤操作:
- 下载 Mosquitto
- 通过运行安装程序将 Mosquitto 安装为一个 Windows 服务。
- 启动 Mosquitto 服务。
有关将 Mosquitto 作为 Windows 服务运行的更多详细信息,请参阅其
自述文件窗口和
自述文件。
- 通过以下步骤验证 Mosquitto 服务器是否在本地运行并在端口 1883 上侦听:
- 打开 PowerShell。
- 键入命令
netstat -a -p TCP | findstr 1883
检查是否有
在端口 1883 上侦听的活动连接。
- 验证命令是否输出如下内容:
TCP 0.0.0.0:1883 :0 LISTENING
- 如果没有前述步骤所述输出,请参阅上文列出的 Mosquitto 文档,
检查您的设置是否正确。
- 确保允许 Mosquitto 代理通过 Windows 防火墙进行通信。按照 Microsoft 的
指示,允许应用程序通过 Windows 10 Defender 防火墙进行通信。 运行此 MQTT 示例后,最好通过 Windows 防火墙禁用 Mosquitto 代理通信,
以避免不必要的网络流量进入您的机器。
- 验证 Mosquitto 代理运行成功后,更新配置
democonfigMQTT_BROKER_ENDPOINT
到 Windows 主机本地 IP 地址。请注意,
"localhost" 或地址 "127.0.0.1" 将不起作用,因为此示例在 Windows 模拟器上运行,而不是在
Windows 主机本机上运行。另请注意,如果 Windows 主机使用虚拟专用网络 (VPN),
与 Mosquitto 代理的连接可能无法工作。
使用单独的 MQTT 客户端(如
MQTT.fx)
来测试从您的主机到本地 MQTT 代理的 MQTT 连接。
注意:端口号 1883 是未加密的 MQTT 的默认端口号。如果您无法使用该端口
(例如,如果它被您的 IT 安全策略阻止),请把 Mosquitto 使用的端口更改为更高的端口号
(例如,50000 到 55000 范围内的端口号),并相应地设置
democonfigMQTT_BROKER_PORT
。此文件中定义的常量
Mosquitto 使用的端口号是由位于 Mosquitto 安装目录下的 "
mosquitto.conf
" 中的 "
port
"参数
设置的。
备选方案 3:您选择的任何其他未加密 MQTT 代理:
支持未加密 TCP/IP 通信的任何 MQTT 代理也可与此演示一起使用。请按以下步骤操作:
- 打开您的
/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Keep_Alive/demo_config.h 本地副本。
- 添加下列行,并设置您所选择的代理:
功能
该演示创建了一个单个应用程序任务,该任务通过一系列示例循环,演示如何连接到
代理,使用自动重新加载定时器处理存活,订阅代理上的主题,在代理上发布主题,
最后,断开与代理的连接。演示应用程序订阅一个主题,并向同一个主题发布。每次
当演示向 MQTT 代理发布消息时,代理会向演示应用程序发送相同的消息。该
演示的结构体如下:
static void prvMQTTDemoTask( void * pvParameters )
{
uint32_t ulTopicCount = 0U;
NetworkContext_t xNetworkContext = { 0 };
MQTTContext_t xMQTTContext;
MQTTStatus_t xMQTTStatus;
PlaintextTransportStatus_t xNetworkStatus;
BaseType_t xTimerStatus;
( void ) pvParameters;
xMQTTStatus = MQTT_SerializePingreq( &xPingReqBuffer );
configASSERT( xMQTTStatus == MQTTSuccess );
for( ; ; )
{
xNetworkStatus = prvConnectToServerWithBackoffRetries( &xNetworkContext );
configASSERT( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS );
LogInfo( ( "Creating an MQTT connection to %s.", democonfigMQTT_BROKER_ENDPOINT ) );
prvCreateMQTTConnectionWithBroker( &xMQTTContext, &xNetworkContext );
xPingReqTimer = xTimerCreateStatic( "PingReqTimer",
mqttexamplePING_REQUEST_DELAY,
pdTRUE,
( void * ) &xMQTTContext.transportInterface,
prvPingReqTimerCallback,
&xPingReqTimerBuffer );
configASSERT( xPingReqTimer );
xPingRespTimer = xTimerCreateStatic( "PingRespTimer",
mqttexamplePING_RESPONSE_DELAY,
pdFALSE,
NULL,
prvPingRespTimerCallback,
&xPingRespTimerBuffer );
configASSERT( xPingRespTimer );
xTimerStatus = xTimerStart( xPingReqTimer, 0 );
configASSERT( xTimerStatus == pdPASS );
prvMQTTSubscribeWithBackoffRetries( &xMQTTContext );
vTaskDelay( mqttexamplePING_REQUEST_DELAY );
LogInfo( ( "Publish to the MQTT topic %s.", mqttexampleTOPIC ) );
prvMQTTPublishToTopic( &xMQTTContext );
LogInfo( ( "Attempt to receive publish message from broker." ) );
while( xReceivedPubAck == pdFALSE )
{
ulReceiveLoopIterations += 1U;
configASSERT( ulReceiveLoopIterations <= mqttexampleMAX_RECEIVE_LOOP_ITERATIONS );
vTaskDelay( mqttexampleRECEIVE_LOOP_ITERATION_DELAY );
xMQTTStatus = MQTT_ReceiveLoop( &xMQTTContext, 0U );
configASSERT( xMQTTStatus == MQTTSuccess );
}
ulReceiveLoopIterations = 0U;
xReceivedPubAck = pdFALSE;
LogInfo( ( "Unsubscribe from the MQTT topic %s.", mqttexampleTOPIC ) );
prvMQTTUnsubscribeFromTopic( &xMQTTContext );
while( xReceivedUnsubAck == pdFALSE )
{
ulReceiveLoopIterations += 1U;
configASSERT( ulReceiveLoopIterations <= mqttexampleMAX_RECEIVE_LOOP_ITERATIONS );
vTaskDelay( mqttexampleRECEIVE_LOOP_ITERATION_DELAY );
}
ulReceiveLoopIterations = 0U;
xReceivedUnsubAck = pdFALSE;
LogInfo( ( "Disconnecting the MQTT connection with %s.",
democonfigMQTT_BROKER_ENDPOINT ) );
xMQTTStatus = MQTT_Disconnect( &xMQTTContext );
configASSERT( xMQTTStatus == MQTTSuccess );
xTimerStatus = xTimerStop( xPingReqTimer, 0 );
configASSERT( xTimerStatus == pdPASS );
xTimerStatus = xTimerStop( xPingRespTimer, 0 );
configASSERT( xTimerStatus == pdPASS );
xNetworkStatus = Plaintext_FreeRTOS_Disconnect( &xNetworkContext );
configASSERT( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS );
for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ )
{
xTopicFilterContext[ ulTopicCount ].xSubAckStatus = MQTTSubAckFailure;
}
LogInfo( ( "prvMQTTDemoTask() completed an iteration successfully. "
"Total free heap is %u.",
xPortGetFreeHeapSize() ) );
LogInfo( ( "Demo completed successfully." ) );
LogInfo( ( "Short delay before starting the next iteration.... \r\n" ) );
vTaskDelay( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS );
}
}
连接到 MQTT 代理
函数
prvConnectToServerWithBackoffRetries()
试图与 MQTT 代理建立 TCP 连接。如果
连接失败,则会在超时后重试。超时值将呈指数增长,
直到达到尝试次数或最大超时值。
prvConnectToServerWithBackoffRetries()
会返回
失败状态 - 如果在配置的尝试次数后无法建立与代理的 TCP 连接。
static PlaintextTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkContext_t * pxNetworkContext )
{
PlaintextTransportStatus_t xNetworkStatus;
RetryUtilsStatus_t xRetryUtilsStatus = RetryUtilsSuccess;
RetryUtilsParams_t xReconnectParams;
RetryUtils_ParamsReset( &xReconnectParams );
xReconnectParams.maxRetryAttempts = MAX_RETRY_ATTEMPTS;
do
{
LogInfo( ( "Create a TCP connection to %s:%d.",
democonfigMQTT_BROKER_ENDPOINT,
democonfigMQTT_BROKER_PORT ) );
xNetworkStatus = Plaintext_FreeRTOS_Connect( pxNetworkContext,
democonfigMQTT_BROKER_ENDPOINT,
democonfigMQTT_BROKER_PORT,
mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS,
mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS );
if( xNetworkStatus != PLAINTEXT_TRANSPORT_SUCCESS )
{
LogWarn( ( "Connection to the broker failed. Retrying connection with backoff and jitter." ) );
xRetryUtilsStatus = RetryUtils_BackoffAndSleep( &xReconnectParams );
}
if( xRetryUtilsStatus == RetryUtilsRetriesExhausted )
{
LogError( ( "Connection to the broker failed, all attempts exhausted." ) );
xNetworkStatus = PLAINTEXT_TRANSPORT_CONNECT_FAILURE;
}
} while( ( xNetworkStatus != PLAINTEXT_TRANSPORT_SUCCESS ) && ( xRetryUtilsStatus == RetryUtilsSuccess ) );
return xNetworkStatus;
}
函数 "
prvCreateMQTTConnectionWithBroker()
" 演示了如何通过
清除会话与 MQTT 代理建立未加密连接。它使用 FreeRTOS-Plus-TCP 传输接口 - 在文件
"
FreeRTOS-Plus/Source/Application-Protocols/platform/freertos/transport/src/plaintext_freertos.c'
" 中实现。编译在此项目中的
"
prvCreateMQTTConnectionWithBroker()
" 的定义如下所示。请记住,我们在
“
xConnectInfo
” 设置代理的存活秒数。
以下函数展示了如何使用
MQTT_Init()
在 MQTT 上下文中设置 FreeRTOS-Plus-TCP 传输接口。
它还展示了如何设置事件回调函数指针 (
prvEventCallback
)。此回调用于报告
传入消息。
static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext,
NetworkContext_t * pxNetworkContext )
{
MQTTStatus_t xResult;
MQTTConnectInfo_t xConnectInfo;
bool xSessionPresent;
TransportInterface_t xTransport;
xTransport.pNetworkContext = pxNetworkContext;
xTransport.send = Plaintext_FreeRTOS_send;
xTransport.recv = Plaintext_FreeRTOS_recv;
xResult = MQTT_Init( pxMQTTContext, &xTransport, prvGetTimeMs, prvEventCallback, &xBuffer );
configASSERT( xResult == MQTTSuccess );
( void ) memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) );
xConnectInfo.cleanSession = true;
xConnectInfo.pClientIdentifier = democonfigCLIENT_IDENTIFIER;
xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( democonfigCLIENT_IDENTIFIER );
xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS;
xResult = MQTT_Connect( pxMQTTContext,
&xConnectInfo,
NULL,
mqttexampleCONNACK_RECV_TIMEOUT_MS,
&xSessionPresent );
configASSERT( xResult == MQTTSuccess );
}
"
prvCreateMQTTConnectionWithBroker()
" 演示了如何
通过清除对话与 MQTT 代理建立未加密连接。
使用自动重新加载定时器处理存活
应用程序连接到代理后,它会创建一个自动重新加载定时器,
负责在每次传递 "
mqttexampleKEEP_ALIVE_DELAY
" 后调用回调。此回调使用 coreMQTT 序列化器 API 序列化 ping 请求数据包,
然后将其发送到 MQTT 代理。此回调函数的定义如下所示:
static void prvPingReqTimerCallback( TimerHandle_t pxTimer )
{
TransportInterface_t * pxTransport;
int32_t xTransportStatus;
BaseType_t xTimerStatus;
pxTransport = ( TransportInterface_t * ) pvTimerGetTimerID( pxTimer );
if( xWaitingForPingResp == false )
{
LogInfo( ( "Ping the MQTT broker." ) );
xTransportStatus = pxTransport->send( pxTransport->pNetworkContext,
( void * ) xPingReqBuffer.pBuffer,
xPingReqBuffer.size );
configASSERT( ( size_t ) xTransportStatus == xPingReqBuffer.size );
xWaitingForPingResp = true;
xTimerStatus = xTimerStart( xPingRespTimer, 0 );
configASSERT( xTimerStatus == pdPASS );
}
}
"
prvKeepAliveTimerCallback()
" 演示了如何向 MQTT 代理发送 ping 请求数据包。数据包发送后,
另一个定时器被启动,以预期用如下所示的另一个函数来处理 ping 响应:
static void prvPingRespTimerCallback( TimerHandle_t pxTimer )
{
( void ) pxTimer;
configASSERT( xWaitingForPingResp == false );
}
"
prvKeepAliveTimerCallback()
" 只断言已收到 ping 响应。
订阅 MQTT 主题
函数 "
prvMQTTSubscribeWithBackoffRetries()
" 演示了如何订阅 MQTT 代理上的主题过滤器
。该示例演示了如何订阅一个主题过滤器,但也可以在同一个 API 调用中传递一个主题过滤器列表,
以订阅一个以上的主题过滤器。此外,如果 MQTT 代理拒绝订阅请求,则
订阅将重试 "
MAX_RETRY_ATTEMPTS
"。此函数的定义如下所示:
static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext )
{
MQTTStatus_t xResult = MQTTSuccess;
RetryUtilsStatus_t xRetryUtilsStatus = RetryUtilsSuccess;
RetryUtilsParams_t xRetryParams;
MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ];
bool xFailedSubscribeToTopic = false;
uint32_t ulTopicCount = 0U;
( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) );
usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext );
xMQTTSubscription[ 0 ].qos = MQTTQoS0;
xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC;
xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC );
RetryUtils_ParamsReset( &xRetryParams );
xRetryParams.maxRetryAttempts = MAX_RETRY_ATTEMPTS;
do
{
LogInfo( ( "Attempt to subscribe to the MQTT topic %s.", mqttexampleTOPIC ) );
xResult = MQTT_Subscribe( pxMQTTContext,
xMQTTSubscription,
sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ),
usSubscribePacketIdentifier );
configASSERT( xResult == MQTTSuccess );
LogInfo( ( "SUBSCRIBE sent for topic %s to broker.\n\n", mqttexampleTOPIC ) );
while( xReceivedSubAck == pdFALSE )
{
ulReceiveLoopIterations += 1U;
configASSERT( ulReceiveLoopIterations <= mqttexampleMAX_RECEIVE_LOOP_ITERATIONS );
vTaskDelay( mqttexampleRECEIVE_LOOP_ITERATION_DELAY );
xResult = MQTT_ReceiveLoop( pxMQTTContext, 0U );
configASSERT( xResult == MQTTSuccess );
}
ulReceiveLoopIterations = 0U;
xReceivedSubAck = pdFALSE;
xFailedSubscribeToTopic = false;
for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ )
{
if( xTopicFilterContext[ ulTopicCount ].xSubAckStatus == MQTTSubAckFailure )
{
LogWarn( ( "Server rejected subscription request. Attempting to re-subscribe to topic %s.",
xTopicFilterContext[ ulTopicCount ].pcTopicFilter ) );
xFailedSubscribeToTopic = true;
xRetryUtilsStatus = RetryUtils_BackoffAndSleep( &xRetryParams );
break;
}
}
configASSERT( xRetryUtilsStatus != RetryUtilsRetriesExhausted );
} while( ( xFailedSubscribeToTopic == true ) && ( xRetryUtilsStatus == RetryUtilsSuccess ) );
}
接收传入消息
如前所述,应用程序在连接到代理之前注册事件回调函数。函数
"
'prvMQTTDemoTask()
" 通过调用 "
MQTT_ReceiveLoop()
" 来接收传入消息。当接收到传入的 MQTT 消息时,
它会调用应用程序注册的事件回调函数。函数
"
prvEventCallback()
" 是这种事件回调函数的示例;它检查传入的数据包类型,
并调用适当的处理程序。在此示例中,函数要么调用 "
prvMQTTProcessIncomingPublish()
" 来
处理传入的发布消息,要么调用 "
prvMQTTProcessResponse()
" 来处理确认。
static void prvEventCallback( MQTTContext_t * pxMQTTContext,
MQTTPacketInfo_t * pxPacketInfo,
MQTTDeserializedInfo_t * pxDeserializedInfo )
{
( void ) pxMQTTContext;
if( ( pxPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH )
{
prvMQTTProcessIncomingPublish( pxDeserializedInfo->pPublishInfo );
}
else
{
prvMQTTProcessResponse( pxPacketInfo, pxDeserializedInfo->packetIdentifier );
}
}
发布到主题
函数 "
prvMQTTPublishToTopic()
" 演示了如何在 MQTT 代理上发布主题过滤器。此函数的
定义如下所示:
static void prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext )
{
MQTTStatus_t xResult;
MQTTPublishInfo_t xMQTTPublishInfo;
BaseType_t xTimerStatus;
( void ) memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) );
xMQTTPublishInfo.qos = MQTTQoS0;
xMQTTPublishInfo.retain = false;
xMQTTPublishInfo.pTopicName = mqttexampleTOPIC;
xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( mqttexampleTOPIC );
xMQTTPublishInfo.pPayload = mqttexampleMESSAGE;
xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE );
xResult = MQTT_Publish( pxMQTTContext, &xMQTTPublishInfo, 0U );
configASSERT( xResult == MQTTSuccess );
xTimerStatus = prvCheckTimeoutThenResetTimer( xKeepAliveTimer );
configASSERT( xTimerStatus == pdPASS );
}
处理传入的 MQTT 发布数据包
函数 "
prvMQTTProcessIncomingPublish()
" 演示了如何处理来自 MQTT 代理的
PUBLISH
数据包
。此函数的定义如下所示:
static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo )
{
configASSERT( pxPublishInfo != NULL );
LogInfo( ( "Incoming QoS : %d\n", pxPublishInfo->qos ) );
if( ( pxPublishInfo->topicNameLength == strlen( mqttexampleTOPIC ) ) &&
( 0 == strncmp( mqttexampleTOPIC, pxPublishInfo->pTopicName, pxPublishInfo->topicNameLength ) ) )
{
LogInfo( ( "Incoming Publish Topic Name: %.*s matches subscribed topic.\r\n"
"Incoming Publish Message : %.*s",
pxPublishInfo->topicNameLength,
pxPublishInfo->pTopicName,
pxPublishInfo->payloadLength,
pxPublishInfo->pPayload ) );
}
else
{
LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.",
pxPublishInfo->topicNameLength,
pxPublishInfo->pTopicName ) );
}
}
取消订阅主题
工作流中的最后一步取消订阅主题,因此代理不再发送已在
"
mqttexampleTOPIC
" 上发布的任何消息。此函数的定义如下所示:
static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext )
{
MQTTStatus_t xResult;
MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ];
( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) );
usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext );
xMQTTSubscription[ 0 ].qos = MQTTQoS0;
xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC;
xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC );
usUnsubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext );
xResult = MQTT_Unsubscribe( pxMQTTContext,
xMQTTSubscription,
sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ),
usUnsubscribePacketIdentifier );
configASSERT( xResult == MQTTSuccess );
}
Copyright (C) Amazon Web Services, Inc. or its affiliates. All rights reserved.