下载 FreeRTOS
 

出色的 RTOS & 嵌入式软件

最新资讯
简化任何设备的身份验证云连接。
利用 CoAP 设计节能型云连接 IoT 解决方案。
11.0.0 版 FreeRTOS 内核简介:
FreeRTOS 路线图和代码贡献流程。
使用 FreeRTOS 实现 OPC-UA over TSN。

coreMQTT 存活演示

注意:我们建议在构建任何物联网 (IoT) 应用程序时始终使用双向身份验证。 此页面上的演示在引入加密和身份验证之前演示了 MQTT 通信,仅用于教育目的, 不适用于生产。

简介

存活 MQTT 演示项目使用 FreeRTOSWindows 端口,因此您 因此您可以用 Windows 上免费的 Visual Studio 社区版来构建和评估该演示,而无需任何特殊 MCU 硬件。本项目提供了一种替代发送存活数据包的方法, 以便在给定的存活间隔内不发送控制数据包的情况下保持与 MQTT 代理的连接。

存活 MQTT 演示展示了如何建立一个与 MQTT 代理的明文 TCP 连接, 如果连接失败,则采用指数退避逻辑。在建立 TCP 连接后,客户端也会发送一个 MQTT 连接数据包, 其中包括关于所述代理存活间隔的信息。如果代理在 此给定间隔的 1.5 倍时间内未收到控制数据包,代理将关闭连接。为了避免这种情况,使用自动重新加载软件定时器 在间隔到期之前向代理发送 ping 请求。每当执行定时器以发送 ping 请求时, 另一个定时器会被启动,等待来自代理的 ping 响应。 接下来,客户端会订阅单个主题过滤器,然后等待足够长的时间来执行定时器。 之后,客户端以 QoS 1 级别发布到该主题,并反复调用 "MQTT_ReceiveLoop" 以接收 来自代理的发布确认。向 "MQTT_ReceiveLoop" 传递一个其值为 0 的超时,以只运行一次迭代, 任务在每次迭代之间延迟。请注意,如果在两次迭代中未收到发布确认,则将执行 ping 请求定时器。 整个周期无限期地重复。 请记住,使用 "MQTT_ProcessLoop" 代替 "MQTT_ReceiveLoop" 也能实现相同效果。但是,"MQTT_ProcessLoop" 需要一个定时器查询函数来返回当前时间(以毫秒为单位)。

下面提供的说明将演示如何连接到在互联网上托管的 Mosquitto 测试代理, 或在主机上本地运行的服务器。

本演示作为学习练习使用。本演示创建 安全连接,但可以轻松修改以使用 TLS 连接。但是,发送的所有 MQTT 消息都是 未加密的明文。请勿从您的 IoT 设备向 MQTT 代理发送任何机密信息。此 MQTT 代理由不隶属于 FreeRTOS 的第三方公开托管。此 MQTT 代理可能在任何时候变得不可用, 并且它不是由 FreeRTOS 维护的。生产 IoT 设备应使用相互身份验证和加密的网络连接, 正如 MQTT TLS 演示 中所展示的那样。

注意:Mosquitto 是一个开源 MQTT 消息代理。更多详细信息 请点击此处

源代码组织

用于存活 MQTT 演示的 Visual Studio 解决方案称为 mqtt_keep_alive_demo.sln,可在 FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Keep_Alive 目录中找到, 而该目录又位于主 FreeRTOS 下载中。

构建演示项目

此演示项目使用 Visual Studio 的社区免费版。要构建演示,请执行如下操作:
  1. 从 Visual Studio IDE 中打开“mqtt_keep_alive_demo.sln” Visual Studio 解决方案文件。
  2. 在 IDE 的 "Build" 菜单中选择 "Build Solution"。
注意:如果您使用的是 Microsoft Visual Studio 2017 或更早版本, 则必须选择与您的版本兼容的“平台工具集”:“项目-> RTOS 演示属性-> 平台工具集

配置演示项目

演示使用 FreeRTOS-Plus-TCP TCP/IP 堆栈, 因此请遵循为 TCP/IP Starter 项目提供的说明,以确保您:

  1. 安装了必要的 组件(如 WinPCap)。
  2. 设置静态或 动态 IP 地址、网关地址和网络掩码(可选)。
  3. 设置一个 MAC 地址 (可选)。
  4. 在您的主机上选择 以太网网络接口
  5. 重要步骤: 在尝试运行 MQTT 演示之前测试您的网络 连接
所有这些设置都应在 MQTT LTS rc1 演示项目中更改,而不是在上面链接的页面中所提及的 TCP/IP Starter 项目中 更改!交付时,TCP/IP 栈被配置为使用动态 IP 地址。

配置 MQTT 代理连接

备选方案 1:使用公开托管的 Mosquitto MQTT 代理( web 托管):

该演示项目预先配置为在 "test.mosqitto.org" 中与 Mosquitto 的公开托管消息代理进行通信。 如果演示连接到具有 DHCP 服务和互联网接入的网络,则此操作应有效。请注意,FreeRTOSWindows 端口仅适用于有线以太网网络适配器,该适配器可以是虚拟以太网适配器。 您应使用另一个 MQTT 客户端,如 MQTT.fx,测试从主机到公共 MQTT 代理的 MQTT 连接。

备选方案 2:使用本地托管的 Mosquitto MQTT 消息代理(主机):

Mosquitto 代理也可以在本地运行,无论是在您的主机上(用于构建演示应用程序的机器),还是在 您本地网络的另一台计算机上。请按以下步骤操作:
  1. 下载 Mosquitto
  2. 通过运行安装程序将 Mosquitto 安装为一个 Windows 服务。
  3. 启动 Mosquitto 服务。 有关将 Mosquitto 作为 Windows 服务运行的更多详细信息,请参阅其 自述文件窗口 自述文件
  4. 通过以下步骤验证 Mosquitto 服务器是否在本地运行并在端口 1883 上侦听:
    1. 打开 PowerShell。
    2. 键入命令
      netstat -a -p TCP | findstr 1883
      检查是否有 在端口 1883 上侦听的活动连接。
    3. 验证命令是否输出如下内容:
      TCP    0.0.0.0:1883           :0       LISTENING
    4. 如果没有前述步骤所述输出,请参阅上文列出的 Mosquitto 文档, 检查您的设置是否正确。
  5. 确保允许 Mosquitto 代理通过 Windows 防火墙进行通信。按照 Microsoft 的 指示,允许应用程序通过 Windows 10 Defender 防火墙进行通信。 运行此 MQTT 示例后,最好通过 Windows 防火墙禁用 Mosquitto 代理通信, 以避免不必要的网络流量进入您的机器。
  6. 验证 Mosquitto 代理运行成功后,更新配置 democonfigMQTT_BROKER_ENDPOINT 到 Windows 主机本地 IP 地址。请注意, "localhost" 或地址 "127.0.0.1" 将不起作用,因为此示例在 Windows 模拟器上运行,而不是在 Windows 主机本机上运行。另请注意,如果 Windows 主机使用虚拟专用网络 (VPN), 与 Mosquitto 代理的连接可能无法工作。
使用单独的 MQTT 客户端(如 MQTT.fx) 来测试从您的主机到本地 MQTT 代理的 MQTT 连接。 注意:端口号 1883 是未加密的 MQTT 的默认端口号。如果您无法使用该端口 (例如,如果它被您的 IT 安全策略阻止),请把 Mosquitto 使用的端口更改为更高的端口号 (例如,50000 到 55000 范围内的端口号),并相应地设置 democonfigMQTT_BROKER_PORT。此文件中定义的常量 Mosquitto 使用的端口号是由位于 Mosquitto 安装目录下的 " mosquitto.conf" 中的 "port"参数 设置的。

备选方案 3:您选择的任何其他未加密 MQTT 代理:

支持未加密 TCP/IP 通信的任何 MQTT 代理也可与此演示一起使用。请按以下步骤操作:
  1. 打开您的 /FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Keep_Alive/demo_config.h 本地副本。
  2. 添加下列行,并设置您所选择的代理:
    • #define democonfigMQTT_BROKER_ENDPOINT "your-desired-endpoint"
    • #define democonfigMQTT_BROKER_PORT ( 1883 )

功能

该演示创建了一个单个应用程序任务,该任务通过一系列示例循环,演示如何连接到 代理,使用自动重新加载定时器处理存活,订阅代理上的主题,在代理上发布主题, 最后,断开与代理的连接。演示应用程序订阅一个主题,并向同一个主题发布。每次 当演示向 MQTT 代理发布消息时,代理会向演示应用程序发送相同的消息。该 演示的结构体如下:


static void prvMQTTDemoTask( void * pvParameters )
{
uint32_t ulTopicCount = 0U;
NetworkContext_t xNetworkContext = { 0 };
MQTTContext_t xMQTTContext;
MQTTStatus_t xMQTTStatus;
PlaintextTransportStatus_t xNetworkStatus;
BaseType_t xTimerStatus;

/* Remove compiler warnings about unused parameters. */
( void ) pvParameters;

/* Serialize a PINGREQ packet to send upon invoking the keep-alive timer

* callback. */

xMQTTStatus = MQTT_SerializePingreq( &xPingReqBuffer );
configASSERT( xMQTTStatus == MQTTSuccess );

for( ; ; )
{
/****************************** Connect. ******************************/

/* Attempt to connect to the MQTT broker. If connection fails, retry

* after a timeout. The timeout value will be exponentially increased

* until the maximum number of attempts are reached or the maximum

* timeout value is reached. The function below returns a failure status

* if the TCP connection cannot be established to the broker after

* the configured number of attempts. */

xNetworkStatus = prvConnectToServerWithBackoffRetries( &xNetworkContext );
configASSERT( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS );

/* Sends an MQTT Connect packet over the already connected TCP socket,

* and waits for connection acknowledgment (CONNACK) packet. */

LogInfo( ( "Creating an MQTT connection to %s.", democonfigMQTT_BROKER_ENDPOINT ) );
prvCreateMQTTConnectionWithBroker( &xMQTTContext, &xNetworkContext );

/* Create timers to handle keep-alive. */
xPingReqTimer = xTimerCreateStatic( "PingReqTimer",
mqttexamplePING_REQUEST_DELAY,
pdTRUE,
( void * ) &xMQTTContext.transportInterface,
prvPingReqTimerCallback,
&xPingReqTimerBuffer );
configASSERT( xPingReqTimer );
xPingRespTimer = xTimerCreateStatic( "PingRespTimer",
mqttexamplePING_RESPONSE_DELAY,
pdFALSE,
NULL,
prvPingRespTimerCallback,
&xPingRespTimerBuffer );
configASSERT( xPingRespTimer );

/* Start the timer to send a PINGREQ. */
xTimerStatus = xTimerStart( xPingReqTimer, 0 );
configASSERT( xTimerStatus == pdPASS );

/**************************** Subscribe. ******************************/

/* If the server rejected the subscription request, attempt to resubscribe

* to the topic. Attempts are made according to the exponential backoff retry

* strategy declared in retry_utils.h. */

prvMQTTSubscribeWithBackoffRetries( &xMQTTContext );

/************************ Send PINGREQ packet. ************************/

/* Deliberately delay in order for the auto-reload timer to send a PINGREQ to the broker. */
vTaskDelay( mqttexamplePING_REQUEST_DELAY );

/********************* Publish and Receive Loop. **********************/
/* Publish messages with QOS1, send and process keep-alive messages. */
LogInfo( ( "Publish to the MQTT topic %s.", mqttexampleTOPIC ) );
prvMQTTPublishToTopic( &xMQTTContext );

/* Process the incoming publish echo. Since the application subscribed to
* the same topic, the broker will send the same publish message back
* to the application. */
LogInfo( ( "Attempt to receive publish message from broker." ) );
while( xReceivedPubAck == pdFALSE )
{
ulReceiveLoopIterations += 1U;
configASSERT( ulReceiveLoopIterations <= mqttexampleMAX_RECEIVE_LOOP_ITERATIONS );

vTaskDelay( mqttexampleRECEIVE_LOOP_ITERATION_DELAY );

xMQTTStatus = MQTT_ReceiveLoop( &xMQTTContext, 0U );
configASSERT( xMQTTStatus == MQTTSuccess );
}

/* Reset after loop. */
ulReceiveLoopIterations = 0U;
xReceivedPubAck = pdFALSE;

/******************** Unsubscribe from the topic. *********************/
LogInfo( ( "Unsubscribe from the MQTT topic %s.", mqttexampleTOPIC ) );
prvMQTTUnsubscribeFromTopic( &xMQTTContext );

/* Process an incoming packet from the broker. */
while( xReceivedUnsubAck == pdFALSE )
{
ulReceiveLoopIterations += 1U;
configASSERT( ulReceiveLoopIterations <= mqttexampleMAX_RECEIVE_LOOP_ITERATIONS );

vTaskDelay( mqttexampleRECEIVE_LOOP_ITERATION_DELAY );

}

/* Reset after loop. */
ulReceiveLoopIterations = 0U;
xReceivedUnsubAck = pdFALSE;

/**************************** Disconnect. *****************************/

/* Send an MQTT disconnect packet over the connected TCP socket.

* There is no corresponding response for the disconnect packet. After

* sending the disconnect, the client must close the network connection. */

LogInfo( ( "Disconnecting the MQTT connection with %s.",
democonfigMQTT_BROKER_ENDPOINT ) );
xMQTTStatus = MQTT_Disconnect( &xMQTTContext );
configASSERT( xMQTTStatus == MQTTSuccess );

/* Stop the keep-alive timers for the next iteration. */
xTimerStatus = xTimerStop( xPingReqTimer, 0 );
configASSERT( xTimerStatus == pdPASS );
xTimerStatus = xTimerStop( xPingRespTimer, 0 );
configASSERT( xTimerStatus == pdPASS );

/* Close the network connection. */
xNetworkStatus = Plaintext_FreeRTOS_Disconnect( &xNetworkContext );
configASSERT( xNetworkStatus == PLAINTEXT_TRANSPORT_SUCCESS );

/* Reset the SUBACK status for each topic filter after completion of the

* subscription request cycle. */

for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ )
{
xTopicFilterContext[ ulTopicCount ].xSubAckStatus = MQTTSubAckFailure;
}

/* Wait for some time between two iterations to ensure that we do not

* bombard the broker. */

LogInfo( ( "prvMQTTDemoTask() completed an iteration successfully. "
"Total free heap is %u.",
xPortGetFreeHeapSize() ) );
LogInfo( ( "Demo completed successfully." ) );
LogInfo( ( "Short delay before starting the next iteration.... \r\n" ) );
vTaskDelay( mqttexampleDELAY_BETWEEN_DEMO_ITERATIONS );
}
}

连接到 MQTT 代理

函数 prvConnectToServerWithBackoffRetries() 试图与 MQTT 代理建立 TCP 连接。如果 连接失败,则会在超时后重试。超时值将呈指数增长, 直到达到尝试次数或最大超时值。prvConnectToServerWithBackoffRetries() 会返回 失败状态 - 如果在配置的尝试次数后无法建立与代理的 TCP 连接。

static PlaintextTransportStatus_t prvConnectToServerWithBackoffRetries( NetworkContext_t * pxNetworkContext )
{
PlaintextTransportStatus_t xNetworkStatus;
RetryUtilsStatus_t xRetryUtilsStatus = RetryUtilsSuccess;
RetryUtilsParams_t xReconnectParams;

/* Initialize reconnect attempts and interval. */
RetryUtils_ParamsReset( &xReconnectParams );
xReconnectParams.maxRetryAttempts = MAX_RETRY_ATTEMPTS;

/* Attempt to connect to MQTT broker. If connection fails, retry after

* a timeout. Timeout value will exponentially increase till maximum

* attempts are reached.

*/

do
{
/* Establish a TCP connection with the MQTT broker. This example connects to

* the MQTT broker as specified in democonfigMQTT_BROKER_ENDPOINT and

* democonfigMQTT_BROKER_PORT at the top of this file. */

LogInfo( ( "Create a TCP connection to %s:%d.",
democonfigMQTT_BROKER_ENDPOINT,
democonfigMQTT_BROKER_PORT ) );
xNetworkStatus = Plaintext_FreeRTOS_Connect( pxNetworkContext,
democonfigMQTT_BROKER_ENDPOINT,
democonfigMQTT_BROKER_PORT,
mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS,
mqttexampleTRANSPORT_SEND_RECV_TIMEOUT_MS );

if( xNetworkStatus != PLAINTEXT_TRANSPORT_SUCCESS )
{
LogWarn( ( "Connection to the broker failed. Retrying connection with backoff and jitter." ) );
xRetryUtilsStatus = RetryUtils_BackoffAndSleep( &xReconnectParams );
}

if( xRetryUtilsStatus == RetryUtilsRetriesExhausted )
{
LogError( ( "Connection to the broker failed, all attempts exhausted." ) );
xNetworkStatus = PLAINTEXT_TRANSPORT_CONNECT_FAILURE;
}
} while( ( xNetworkStatus != PLAINTEXT_TRANSPORT_SUCCESS ) && ( xRetryUtilsStatus == RetryUtilsSuccess ) );

return xNetworkStatus;
}
函数 "prvCreateMQTTConnectionWithBroker()" 演示了如何通过 清除会话与 MQTT 代理建立未加密连接。它使用 FreeRTOS-Plus-TCP 传输接口 - 在文件 "FreeRTOS-Plus/Source/Application-Protocols/platform/freertos/transport/src/plaintext_freertos.c'" 中实现。编译在此项目中的 "prvCreateMQTTConnectionWithBroker()" 的定义如下所示。请记住,我们在 “xConnectInfo” 设置代理的存活秒数。 以下函数展示了如何使用 MQTT_Init() 在 MQTT 上下文中设置 FreeRTOS-Plus-TCP 传输接口。 它还展示了如何设置事件回调函数指针 (prvEventCallback)。此回调用于报告 传入消息。

static void prvCreateMQTTConnectionWithBroker( MQTTContext_t * pxMQTTContext,
NetworkContext_t * pxNetworkContext )
{
MQTTStatus_t xResult;
MQTTConnectInfo_t xConnectInfo;
bool xSessionPresent;
TransportInterface_t xTransport;

/***

* For readability, error handling in this function is restricted to the use of

* asserts().

***/


/* Fill in Transport Interface send and receive function pointers. */
xTransport.pNetworkContext = pxNetworkContext;
xTransport.send = Plaintext_FreeRTOS_send;
xTransport.recv = Plaintext_FreeRTOS_recv;

/* Initialize MQTT library. */
xResult = MQTT_Init( pxMQTTContext, &xTransport, prvGetTimeMs, prvEventCallback, &xBuffer );
configASSERT( xResult == MQTTSuccess );

/* Many fields not used in this demo so start with everything at 0. */
( void ) memset( ( void * ) &xConnectInfo, 0x00, sizeof( xConnectInfo ) );

/* Start with a clean session i.e. direct the MQTT broker to discard any

* previous session data. Also, establishing a connection with a clean session

* will ensure that the broker does not store any data when this client

* gets disconnected. */

xConnectInfo.cleanSession = true;

/* The client identifier is used to uniquely identify this MQTT client to

* the MQTT broker. In a production device, the identifier can be something

* unique, such as a device serial number. */

xConnectInfo.pClientIdentifier = democonfigCLIENT_IDENTIFIER;
xConnectInfo.clientIdentifierLength = ( uint16_t ) strlen( democonfigCLIENT_IDENTIFIER );

/* Set MQTT keep-alive period. It is the responsibility of the application

* to ensure that the interval between control packets being sent does not

* exceed the keep-alive value. In the absence of sending any other control

* packets, the client MUST send a PINGREQ Packet. */

xConnectInfo.keepAliveSeconds = mqttexampleKEEP_ALIVE_TIMEOUT_SECONDS;

/* Send MQTT CONNECT packet to broker. LWT is not used in this demo, so it

* is passed as NULL. */

xResult = MQTT_Connect( pxMQTTContext,
&xConnectInfo,
NULL,
mqttexampleCONNACK_RECV_TIMEOUT_MS,
&xSessionPresent );
configASSERT( xResult == MQTTSuccess );
}
"prvCreateMQTTConnectionWithBroker()" 演示了如何 通过清除对话与 MQTT 代理建立未加密连接。

使用自动重新加载定时器处理存活

应用程序连接到代理后,它会创建一个自动重新加载定时器, 负责在每次传递 "mqttexampleKEEP_ALIVE_DELAY" 后调用回调。此回调使用 coreMQTT 序列化器 API 序列化 ping 请求数据包, 然后将其发送到 MQTT 代理。此回调函数的定义如下所示:

static void prvPingReqTimerCallback( TimerHandle_t pxTimer )
{
TransportInterface_t * pxTransport;
int32_t xTransportStatus;
BaseType_t xTimerStatus;

pxTransport = ( TransportInterface_t * ) pvTimerGetTimerID( pxTimer );

/* Do not resend if waiting on a PINGRESP. */
if( xWaitingForPingResp == false )
{
/* Send PINGREQ to broker */
LogInfo( ( "Ping the MQTT broker." ) );
xTransportStatus = pxTransport->send( pxTransport->pNetworkContext,
( void * ) xPingReqBuffer.pBuffer,
xPingReqBuffer.size );
configASSERT( ( size_t ) xTransportStatus == xPingReqBuffer.size );

xWaitingForPingResp = true;
/* Start the timer to expect a PINGRESP. */
xTimerStatus = xTimerStart( xPingRespTimer, 0 );
configASSERT( xTimerStatus == pdPASS );
}
}
"prvKeepAliveTimerCallback()" 演示了如何向 MQTT 代理发送 ping 请求数据包。数据包发送后, 另一个定时器被启动,以预期用如下所示的另一个函数来处理 ping 响应:

static void prvPingRespTimerCallback( TimerHandle_t pxTimer )
{
( void ) pxTimer;

/* Assert that a pending PINGRESP has been received. */
configASSERT( xWaitingForPingResp == false );
}
"prvKeepAliveTimerCallback()" 只断言已收到 ping 响应。

订阅 MQTT 主题

函数 "prvMQTTSubscribeWithBackoffRetries()" 演示了如何订阅 MQTT 代理上的主题过滤器 。该示例演示了如何订阅一个主题过滤器,但也可以在同一个 API 调用中传递一个主题过滤器列表, 以订阅一个以上的主题过滤器。此外,如果 MQTT 代理拒绝订阅请求,则 订阅将重试 "MAX_RETRY_ATTEMPTS"。此函数的定义如下所示:

static void prvMQTTSubscribeWithBackoffRetries( MQTTContext_t * pxMQTTContext )
{
MQTTStatus_t xResult = MQTTSuccess;
RetryUtilsStatus_t xRetryUtilsStatus = RetryUtilsSuccess;
RetryUtilsParams_t xRetryParams;
MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ];
bool xFailedSubscribeToTopic = false;
uint32_t ulTopicCount = 0U;

/* Some fields are not used by this demo so start with everything at 0. */
( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) );

/* Get a unique packet id. */
usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext );

/* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to

* only one topic and uses QoS0. */

xMQTTSubscription[ 0 ].qos = MQTTQoS0;
xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC;
xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC );

/* Initialize retry attempts and interval. */
RetryUtils_ParamsReset( &xRetryParams );
xRetryParams.maxRetryAttempts = MAX_RETRY_ATTEMPTS;

do
{
/* The client is now connected to the broker. Subscribe to the topic

* as specified in mqttexampleTOPIC at the top of this file by sending a

* subscribe packet then waiting for a subscribe acknowledgment (SUBACK).

* This client will then publish to the same topic it subscribed to, so it

* will expect all the messages it sends to the broker to be sent back to it

* from the broker. This demo uses QOS0 in Subscribe. Therefore, the publish

* messages received from the broker will have QOS0. */

LogInfo( ( "Attempt to subscribe to the MQTT topic %s.", mqttexampleTOPIC ) );
xResult = MQTT_Subscribe( pxMQTTContext,
xMQTTSubscription,
sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ),
usSubscribePacketIdentifier );
configASSERT( xResult == MQTTSuccess );

LogInfo( ( "SUBSCRIBE sent for topic %s to broker.\n\n", mqttexampleTOPIC ) );

/* Process incoming packet from the broker. After sending the subscribe, the

* client may receive a publish before it receives a subscribe ack. Therefore,

* call the generic incoming packet processing function. Since this demo is

* subscribing to the topic to which no one is publishing, probability of

* receiving a publish message before a subscribe ack is zero; but the application

* must be ready to receive any packet. This demo uses the generic packet

* processing function everywhere to highlight this fact. */

while( xReceivedSubAck == pdFALSE )
{
ulReceiveLoopIterations += 1U;
configASSERT( ulReceiveLoopIterations <= mqttexampleMAX_RECEIVE_LOOP_ITERATIONS );

vTaskDelay( mqttexampleRECEIVE_LOOP_ITERATION_DELAY );

xResult = MQTT_ReceiveLoop( pxMQTTContext, 0U );
configASSERT( xResult == MQTTSuccess );
}

/* Reset in case another attempt to subscribe is needed. */
ulReceiveLoopIterations = 0U;
xReceivedSubAck = pdFALSE;

/* Reset flag before checking suback responses. */
xFailedSubscribeToTopic = false;

/* Check if the recent subscription request has been rejected. #xTopicFilterContext

* is updated in the event callback to reflect the status of the SUBACK

* sent by the broker. It represents either the QoS level granted by the

* server upon subscription or acknowledgement of server rejection of the

* subscription request. */

for( ulTopicCount = 0; ulTopicCount < mqttexampleTOPIC_COUNT; ulTopicCount++ )
{
if( xTopicFilterContext[ ulTopicCount ].xSubAckStatus == MQTTSubAckFailure )
{
LogWarn( ( "Server rejected subscription request. Attempting to re-subscribe to topic %s.",
xTopicFilterContext[ ulTopicCount ].pcTopicFilter ) );
xFailedSubscribeToTopic = true;
xRetryUtilsStatus = RetryUtils_BackoffAndSleep( &xRetryParams );
break;
}
}

configASSERT( xRetryUtilsStatus != RetryUtilsRetriesExhausted );
} while( ( xFailedSubscribeToTopic == true ) && ( xRetryUtilsStatus == RetryUtilsSuccess ) );
}

接收传入消息

如前所述,应用程序在连接到代理之前注册事件回调函数。函数 "'prvMQTTDemoTask()" 通过调用 "MQTT_ReceiveLoop()" 来接收传入消息。当接收到传入的 MQTT 消息时, 它会调用应用程序注册的事件回调函数。函数 "prvEventCallback()" 是这种事件回调函数的示例;它检查传入的数据包类型, 并调用适当的处理程序。在此示例中,函数要么调用 "prvMQTTProcessIncomingPublish()" 来 处理传入的发布消息,要么调用 "prvMQTTProcessResponse()" 来处理确认。

static void prvEventCallback( MQTTContext_t * pxMQTTContext,
MQTTPacketInfo_t * pxPacketInfo,
MQTTDeserializedInfo_t * pxDeserializedInfo )
{
/* The MQTT context is not used for this demo. */
( void ) pxMQTTContext;

if( ( pxPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH )
{
prvMQTTProcessIncomingPublish( pxDeserializedInfo->pPublishInfo );
}
else
{
prvMQTTProcessResponse( pxPacketInfo, pxDeserializedInfo->packetIdentifier );
}
}

发布到主题

函数 "prvMQTTPublishToTopic()" 演示了如何在 MQTT 代理上发布主题过滤器。此函数的 定义如下所示:

static void prvMQTTPublishToTopic( MQTTContext_t * pxMQTTContext )
{
MQTTStatus_t xResult;
MQTTPublishInfo_t xMQTTPublishInfo;
BaseType_t xTimerStatus;

/***

* For readability, error handling in this function is restricted to the use of

* asserts().

***/


/* Some fields are not used by this demo so start with everything at 0. */
( void ) memset( ( void * ) &xMQTTPublishInfo, 0x00, sizeof( xMQTTPublishInfo ) );

/* This demo uses QoS0. */
xMQTTPublishInfo.qos = MQTTQoS0;
xMQTTPublishInfo.retain = false;
xMQTTPublishInfo.pTopicName = mqttexampleTOPIC;
xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( mqttexampleTOPIC );
xMQTTPublishInfo.pPayload = mqttexampleMESSAGE;
xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE );

/* Send a PUBLISH packet. Packet ID is not used for a QoS0 publish. */
xResult = MQTT_Publish( pxMQTTContext, &xMQTTPublishInfo, 0U );
configASSERT( xResult == MQTTSuccess );

/* When a PUBLISH packet has been sent, the keep-alive timer can be reset. */
xTimerStatus = prvCheckTimeoutThenResetTimer( xKeepAliveTimer );
configASSERT( xTimerStatus == pdPASS );
}

处理传入的 MQTT 发布数据包

函数 "prvMQTTProcessIncomingPublish()" 演示了如何处理来自 MQTT 代理的 PUBLISH 数据包 。此函数的定义如下所示:

static void prvMQTTProcessIncomingPublish( MQTTPublishInfo_t * pxPublishInfo )
{
configASSERT( pxPublishInfo != NULL );

/* Process incoming Publish. */
LogInfo( ( "Incoming QoS : %d\n", pxPublishInfo->qos ) );

/* Verify the received publish is for the we have subscribed to. */
if( ( pxPublishInfo->topicNameLength == strlen( mqttexampleTOPIC ) ) &&
( 0 == strncmp( mqttexampleTOPIC, pxPublishInfo->pTopicName, pxPublishInfo->topicNameLength ) ) )
{
LogInfo( ( "Incoming Publish Topic Name: %.*s matches subscribed topic.\r\n"
"Incoming Publish Message : %.*s",
pxPublishInfo->topicNameLength,
pxPublishInfo->pTopicName,
pxPublishInfo->payloadLength,
pxPublishInfo->pPayload ) );
}
else
{
LogInfo( ( "Incoming Publish Topic Name: %.*s does not match subscribed topic.",
pxPublishInfo->topicNameLength,
pxPublishInfo->pTopicName ) );
}
}

取消订阅主题

工作流中的最后一步取消订阅主题,因此代理不再发送已在 "mqttexampleTOPIC" 上发布的任何消息。此函数的定义如下所示:

static void prvMQTTUnsubscribeFromTopic( MQTTContext_t * pxMQTTContext )
{
MQTTStatus_t xResult;
MQTTSubscribeInfo_t xMQTTSubscription[ mqttexampleTOPIC_COUNT ];

/* Some fields are not used by this demo, so start with everything at 0. */
( void ) memset( ( void * ) &xMQTTSubscription, 0x00, sizeof( xMQTTSubscription ) );

/* Get a unique packet id. */
usSubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext );

/* Subscribe to the mqttexampleTOPIC topic filter. This example subscribes to

* only one topic and uses QoS0. */

xMQTTSubscription[ 0 ].qos = MQTTQoS0;
xMQTTSubscription[ 0 ].pTopicFilter = mqttexampleTOPIC;
xMQTTSubscription[ 0 ].topicFilterLength = ( uint16_t ) strlen( mqttexampleTOPIC );

/* Get the next unique packet identifier. */
usUnsubscribePacketIdentifier = MQTT_GetPacketId( pxMQTTContext );

/* Send the UNSUBSCRIBE packet. */
xResult = MQTT_Unsubscribe( pxMQTTContext,
xMQTTSubscription,
sizeof( xMQTTSubscription ) / sizeof( MQTTSubscribeInfo_t ),
usUnsubscribePacketIdentifier );

configASSERT( xResult == MQTTSuccess );
}
Copyright (C) Amazon Web Services, Inc. or its affiliates. All rights reserved.