使用 coreMQTT 的 MQTT Agent 及其演示
包括 Over the Air (OTA) 更新
MQTT Agent API 使用示例
MQTT Agent API
简介
coreMQTT 是经 MIT 授权的开源 C MQTT 客户端库,适用于基于微控制器和小型微处理器的 IoT 设备。 该客户端库设计简洁,确保其不依赖任何其他库或操作系统,能更好地执行静态分析,如内存安全证明。 正因为这种简单性和独立于操作系统的特性(coreMQTT 完全不需要多线程),coreMQTT 不会直接在实现过程中构建线程安全。 相反,线程安全必须由更高级别的软件提供。 本页所演示的 coreMQTT 扩展以 MQTT Agent(或 MQTT 守护进程)的形式提供更高级的功能。 虽然此处演示的实现目前专用于 FreeRTOS,但对 FreeRTOS 的依赖项并不多,也就是说可以轻松调整该实现,以适配其他操作系统。
实现情况概览和使用模型
MQTT Agent 是独立的任务(或执行线程), 是获准访问 MQTT 库 API 的唯一任务,因此可轻松实现线程安全。 隔离对单个任务的所有 MQTT API 调用可以实现访问序列化,进而无需信号量或任何其他同步原语。
使用 Agent 时,如果应用程序任务想要执行 MQTT 操作,例如发布消息,其会调用 MQTT Agent 的 MQTTAgent_Publish() API ,而不是调用 coreMQTT 的 MQTT_Publish() API。 MQTTAgent_Publish() 将完成 Publish 操作所需的信息打包到结构体中,然后通过队列将该结构体发送到 MQTT Agent 任务。 MQTT Agent 任务接收该结构体,然后代表应用程序调用基础 MQTT 库的 MQTT_Publish() API。
应用程序编写者可借助向 MQTT Agent 发送命令的 API,选择性指定回调函数以及要传递给回调的参数(称为“回调上下文”),以便 Agent 在生成的 MQTT 操作完成时执行。 如此一来,应用程序任务可以选择进入阻塞状态(因此不会占用任何 CPU 时间)等待回调执行,或者在 MQTT 操作正在进行时继续执行。 请参阅本页末尾的示例。
如果在 MQTT Agent API 调用时队列已满,应用程序编写者还可借助向 MQTT Agent 发送命令的 API,指定调用任务在阻塞状态下等待用于向 MQTT Agent 发送命令的队列中存在可用空间的最长时间。 同样,请参阅本页末尾的示例。
演示项目
序言
注意:MQTT Agent 和相关
示例项目可以正常运行,但尚未完成。 请注意,该 Agent 尚不
符合我们的代码质量标准,且尚未经过全面测试。 API 在正式首次发布之前不太可能发生变化。
功能
main() 会先初始化 TCP/IP 堆栈,然后启动 FreeRTOS 内核调度器。
有网络连接时,网络事件钩子可创建单项 RTOS 任务。 该
任务可选择性创建多项其他任务,每项任务都会连接到
MQTT 代理,然后发送和接收各种不同大小
和不同服务质量 (QoS) 级别的 MQTT 数据包。 原始线程
随后成为 MQTT Agent 线程。
以下
常量定义了创建的演示任务。 描述中的链接指向注释,这些注释位于每个实现源文件的顶部,可提供更多信息。
-
#define democonfigCREATE_LARGE_MESSAGE_SUB_PUB_TASK [1 or 0]
设置为 1 时,创建的任务可运行在
large_message_sub_pub_demo.c 中实现的 MQTT 演示;设置为 0 时,则可从构建中忽略该任务。
-
#define democonfigNUM_SIMPLE_SUB_PUB_TASKS_TO_CREATE [n]
设置 simple_pub_sub_demo.c 中实现的任务要创建的实例数,该值可以为 0。
-
#define democonfigCREATE_CODE_SIGNING_OTA_DEMO [1 or 0]
设置为 1 时,可包括 Over the Air (OTA) 更新功能;设置为 0 时,则忽略 OTA。 我们提供单独的页面,演示如何使用 OTA。
-
#define democonfigCREATE_DEFENDER_DEMO [1 or 0]
设置为 1 时,可生成 AWS Device Defender 演示;设置为 0 时,则忽略此功能。 有关 Device Defender 演示的说明,请参阅此处。
获取源代码
在您部署修复之前,
coreMQTT-Agent-Demos 存储库(位于 FreeRTOS GitHub 账号中)
演示了如何在 coreMQTT 上使用 Agent, 还演示了如何
将其他 FreeRTOS 库以子模块形式纳入到项目中。 请勿使用 GitHub 中的 "Download Zip" 链接
获取代码,因为 zip 文件并不包含子模块形式的
库。 请改用以下 Git 命令将存储库
及其子模块克隆到本地计算机上:
使用 HTTPS 进行克隆:
git clone https://github.com/FreeRTOS/coreMQTT-Agent-Demos.git --recurse-submodules
使用 SSH:
git clone git@github.com:FreeRTOS/coreMQTT-Agent-Demos.git --recurse-submodules
如果下载存储库时未使用 --recurse-submodules 实参,则需运行:
git submodule update --init --recursive
撰写本文时,项目使用的是
FreeRTOSWindows 端口、FreeRTOS-Plus-TCP
TCP/IP 堆栈,并使用免费
社区版 Visual Studio 进行构建。
然而,其目录结构经过精心编排,旨在
将来针对其他开发工具添加项目。
源代码组织
Git 存储库的组织结构如下:
+-build
| |
| +-VisualStudio Contains the Visual Studio project for this demo
|
+-lib
| |
| +-AWS Contains a sub-module for the OTA library along with an OTA PAL port for Windows.
| +-FreeRTOS Contains sub-modules of the FreeRTOS libraries used by the demo
| +-ThirdParty Contains submodules of third party libraries used by the demo
|
+-source
|
+-configuration-files Contains configuration files for the demo and libraries
+- . Contains source files that implement the various demos
配置 FreeRTOS-Plus-TCP
本演示使用 FreeRTOS-Plus-TCP TCP/IP 堆栈,因此,请遵守 TCP/IP 入门项目的说明,以确保:
- 安装了必备组件(例如 WinPCap)。
- 设置了静态或动态 IP 地址、网关地址和网络掩码(可选)。
- 设置了 MAC 地址(可选)。
- 在您的主机上选择了以太网接口。
- 最重要的是 ,在尝试运行 MQTT 演示之前,测试了网络连接。
每个演示项目都有自己的配置设置。当你按照网络配置说明进行操作时,
确保应用 MQTT 演示项目中的设置,而不是
TCP/IP 入门项目中的设置。默认情况下,TCP/IP 堆栈被配置为使用动态 IP 地址。
配置 MQTT 代理连接
MQTT Agent 可以本地或远程使用 TLS 或纯文本连接到任何 MQTT 代理。 非 Agent
纯文本演示文档页面上的“配置 MQTT 代理”部分详细介绍了一些本地和远程纯文本 MQTT 代理选项。
服务器身份验证和
相互身份验证页面上的“配置 MQTT 代理”部分则详细介绍了一些 TLS 加密的 MQTT 代理选项。
AWS IoT 用户其他注意事项: 如果希望连接到 AWS IoT,则可以创建必要的云端和设备端配置,
具体操作详见记录单独相互身份验证演示所在页面上的“使用 AWS IoT 消息代理”一节
。 本节引用的脚本位于 MQTT Agent 存储库的 /lib/AWS/tools 目录中
。
确定要连接到的代理之后,请参阅要点下方的代码片段,这些片段描述了
配置 MQTT 连接的编译时配置常量。 这些常量位于 /Source/configuration-files/demo_config.h 中。
请将 TLS 连接所需的所有密钥放置在同一文件中。
重要提示:
-
纯文本连接可用于了解 MQTT 的工作原理和调试连接,这是因为
可以在 WireShark 等网络嗅探器中查看 MQTT 对话。
但是,实际 IoT 设备不应使用未经加密的纯文本连接。 切勿
通过纯文本连接发送私人数据,强烈建议所有 IoT 设备使用经过加密和相互身份验证的连接。
-
将密钥
放在头文件中仅为方便演示。 强烈建议
实际设备将密钥存储在安全位置,如 Secure Element 或 Secure Enclave 中。
此外,建议实际 IoT 设备通过不会泄露密钥的 API
(例如 PKCS #11
或 PSA API)访问密钥和其他加密对象。
#define democonfigCLIENT_IDENTIFIER "Thing1"
#define democonfigMQTT_BROKER_ENDPOINT "192.168.0.100"
#define democonfigMQTT_BROKER_PORT ( 8883 )
#define democonfigUSE_TLS 1
MQTT Agent 配置常量
以下代码片段显示了与 MQTT Agent 相关的编译时常量
及其默认值(在未定义的情况下)。
MQTT Agent 没有自己的配置文件,但会使用
core_mqtt_config.h 中定义的任意宏。 该函数中定义的宏将替代默认值。
#ifndef MQTT_AGENT_MAX_OUTSTANDING_ACKS
#define MQTT_AGENT_MAX_OUTSTANDING_ACKS ( 20 )
#endif
#ifndef MQTT_AGENT_MAX_EVENT_QUEUE_WAIT_TIME
#define MQTT_AGENT_MAX_EVENT_QUEUE_WAIT_TIME ( 1000 )
#endif
MQTT Agent API 使用示例
关于以下代码的重要提示:
-
在 MQTT PUBLISH 得到 MQTT 代理的确认之前,
MQTT PUBLISH 负载和主题字符串必须一直有效。 在下方示例中,
主题字符串为静态常量,
因此始终有效。
-
在本示例中,调用
MQTTAgent_Publish()
的任务需等待收到服务器已确认 MQTT PUBLISH 命令的通知
。任务可以选择性传递完成回调(该回调在收到确认后执行),但这一操作很有用,
因为可发出信号,表明用于保存 MQTT 发布信息的缓冲区何时可以重用。
struct CommandContext
{
TaskHandle_t xTaskToNotify;
MQTTStatus_t xReturnStatus;
};
static const char * const pcTopicName = "/my/topic/x";
static MQTTAgentContext_t xAgentContext;
static void prvCommandCallback( void *pxCommandContext,
MQTTStatus_t xReturnStatus )
{
CommandContext_t *pxApplicationDefinedContext = ( CommandContext_t * ) pxCommandContext;
pxApplicationDefinedContext->xReturnStatus = xReturnStatus;
xTaskNotify( pxApplicationDefinedContext->xTaskToNotify,
xReturnStatus,
eSetValueWithOverwrite );
}
void ExampleOfCallingAgentPublish( char *pcPayload, uint16_t usPayloadLength )
{
MQTTPublishInfo_t xPublishInfo;
CommandContext_t xCommandContext;
MQTTStatus_t xCommandAdded;
BaseType_t xReturn;
CommandInfo_t xCommandInformation;
xPublishInfo.qos = MQTTQoS1;
xPublishInfo.pTopicName = pcTopicName;
xPublishInfo.topicNameLength = ( uint16_t ) strlen( pcTopicName );
xPublishInfo.pPayload = pcPayload;
xPublishInfo.payloadLength = usPayloadLength;
xCommandContext.xTaskToNotify = xTaskGetCurrentTaskHandle();
xCommandInformation.cmdCompleteCallback = prvCommandCallback;
xCommandInformation.pCmdCompleteCallbackContext = &xCommandContext
xCommandInformation.blockTimeMs = mqttexampleMAX_COMMAND_SEND_BLOCK_TIME_MS;
xCommandAdded = MQTTAgent_Publish(
&xAgentContext,
&xPublishInfo,
&xCommandInformation );
if( xCommandAdded == MQTTSuccess )
{
xReturn = xTaskNotifyWait( 0,
0,
NULL,
portMAX_DELAY );
if( xReturn != pdFAIL )
{
}
}
}
MQTT Agent API
目前,MQTT Agent API 记录在
mqtt_agent.h 头文件和
MQTT Agent API 引用文件中。
Copyright (C) Amazon Web Services, Inc. or its affiliates. All rights reserved.