下载 FreeRTOS
 

出色的 RTOS & 嵌入式软件

为什么要编写另一个 MQTT 库?

库记录了一套关于世界应如何运作的决定。 如果您运气好,您可以在库中找到符合您需求和限制条件的模型。 如果您运气不好,您要么会浪费时间,要么需要修改,或者您必须搜索其他更好的库。 任何不幸的结果都有代价,您要么需要进行更多的开发工作,要么最终需要更多的物料,要么需要在产品出货后处理严重缺陷。 如果我们这些库开发人员不再将自己的决定打包到库中,而是将决定权交给您的话,库更有可能以最小的投入或最少的浪费来满足您的需求。

FreeRTOS LTS 版本包含为满足物联网和嵌入式设备不断变化的需求而打造的库。其中最主要的是 coreMQTT ,它为其余的核心库设定标准,并将决定权交给您。

举一个具体的例子,直接依赖基于软件的 TLS 的 MQTT 库可能与具有固有 TCP 和 TLS 功能的蜂窝模块不匹配。 随着设备连接到云端的方式越来越多,供应商提供的模块种类也越来越多,有从普通到特别的各种选择,包括 802.11 Wi-Fi、802.15.4 6LoWPAN、LTE-M、NB-IoT 和 LoRa。 其中许多模块卸载了联网功能,并提供了用于控制的 AT 命令,这些命令可能由套接字库封装 。 将联网功能从 coreMQTT 解耦意味着无论基础传输如何,它都可能同样有用。

CoreMQTT 针对所有 QoS 等级均采用 MQTT 3.1.1客户端,该客户端适用 MIT 开源许可证,并符合 ISO C90 和 MISRA C: 201。该图书馆强调占地面积小、不受依赖性影响和可构成性。因为库不使用堆内存,因此可容纳静态应用程序,并提供内存安全性证明。

读写接口

CoreMQTT 库通过读取函数和写入函数与网络交互。 您向库提供这些函数。 您可以使用或调整随演示提供的示例,这些示例涵盖常见场景,诸如使用 mbedTLS 的基于 TLS 的双向认证。 或者如 FreeRTOS 蜂窝演示中那样,封装卸载模块提供的函数。 您也可以采用一种新颖的方法,例如与智能手机配对的蓝牙低功耗代理。 您甚至可在同一应用程序中使用多种连接。

为了获得这种灵活性,您可以为每个 MQTT 连接提供用于在结构中读写的函数指针以及表示网络环境的不透明指针。 我们称之为传输接口。 这与传统的平台抽象层方法不同,在传统的平台抽象层方法中, 库需要一组(通常很多)固定函数和数据类型,您必须实现这些函数和数据类型,且每个函数和数据类型都必须无差别使用。 coreMQTT 的前提是,小接口比大接口更有用,可以解决更多的问题,并且可以更广泛地共享。 我们与 coreHTTP 库共享传输接口。 下方的文档字符串说明了构成传输接口的类型。


/**

* NetworkContext_t is the incomplete type struct NetworkContext.

* The implemented struct NetworkContext must contain all of the information

* that is needed to receive and send data with the TransportRecv_t

* and the TransportSend_t implementations.

* In the case of TLS over TCP, struct NetworkContext is typically implemented

* with the TCP socket context and a TLS context.

*

* Example code:

*

* struct NetworkContext

* {

* struct MyTCPSocketContext tcpSocketContext;

* struct MyTLSContext tlsContext;

* };

*/


/**

* @brief Transport interface for receiving data on the network.

*

* This function is expected to populate a buffer, with bytes received from the

* transport, and return the number of bytes placed in the buffer.

* In the case of TLS over TCP, TransportRecv_t is typically implemented by

* calling the TLS layer function to receive data. In case of plaintext TCP

* without TLS, it is typically implemented by calling the TCP layer receive

* function. TransportRecv_t may be invoked multiple times by the protocol

* library, if fewer bytes than were requested to receive are returned.

*

* @param[in] pNetworkContext Implementation-defined network context.

* @param[out] pBuffer Buffer to receive the data into.

* @param[in] bytesToRecv Number of bytes requested from the network.

*

* @return The number of bytes received or a negative value to indicate

* error.

*/

typedef int32_t ( * TransportRecv_t )( NetworkContext_t * pNetworkContext,
void * pBuffer,
size_t bytesToRecv );

/**

* @brief Transport interface for sending data over the network.

*

* This function is expected to send the bytes in the given buffer over the

* transport, and return the number of bytes sent.

* In the case of TLS over TCP, TransportSend_t is typically implemented by

* calling the TLS layer function to send data. In case of plaintext TCP

* without TLS, it is typically implemented by calling the TCP layer send

* function. TransportSend_t may be invoked multiple times by the protocol

* library, if fewer bytes than were requested to send are returned.

*

* @param[in] pNetworkContext Implementation-defined network context.

* @param[in] pBuffer Buffer containing the bytes to send over the network stack.

* @param[in] bytesToSend Number of bytes to send over the network.

*

* @return The number of bytes sent or a negative value to indicate error.

*/

typedef int32_t ( * TransportSend_t )( NetworkContext_t * pNetworkContext,
const void * pBuffer,
size_t bytesToSend );

typedef struct TransportInterface
{
TransportRecv_t recv; /**< Transport receive interface. */
TransportSend_t send; /**< Transport send interface. */
NetworkContext_t * pNetworkContext; /**< Implementation-defined network context. */
} TransportInterface_t;

使用退避函数进行连接

其创建连接的方式由您和您的应用程序自行决定。 这有助于保持简单的库和较小的接口。 请注意连接重试时可能会出现问题。 一个大的设备机群进行初始重试实际上可能是一种拒绝服务攻击,或者可能由于服务限制而出现意外的故障模式。 为了降低这种风险, FreeRTOS 提供了 backoffAlgorithm 库 以基于带抖动的上限指数值来计算重试之间的延迟。 此演示代码展示了如何采用 OpenSSL 和 backoffAlgorithm 库建立连接。 请注意, BackoffAlgorithm_GetNextBackoff() 本身并不调用任何睡眠函数。 您直接使用返回的值调用睡眠函数。

/* Initialize reconnect attempts and interval. */
BackoffAlgorithm_InitializeParams( &reconnectParams,
CONNECTION_RETRY_BACKOFF_BASE_MS,
CONNECTION_RETRY_MAX_BACKOFF_DELAY_MS,
CONNECTION_RETRY_MAX_ATTEMPTS );

/* Attempt to connect to MQTT broker. If connection fails, retry after

* a timeout until maximum attempts are reached.

*/

do
{
LogInfo( ( "Establishing a TLS session to %.*s:%d.",
BROKER_ENDPOINT_LENGTH,
BROKER_ENDPOINT,
BROKER_PORT ) );
opensslStatus = Openssl_Connect( pNetworkContext,
&serverInfo,
&opensslCredentials,
TRANSPORT_SEND_RECV_TIMEOUT_MS,
TRANSPORT_SEND_RECV_TIMEOUT_MS );

if( opensslStatus != OPENSSL_SUCCESS )
{
/* Generate a random number and get back-off value (in milliseconds) for the next connection retry. */
backoffAlgStatus = BackoffAlgorithm_GetNextBackoff( &reconnectParams, generateRandomNumber(), &nextRetryBackOff );

if( backoffAlgStatus == BackoffAlgorithmRetriesExhausted )
{
LogError( ( "Connection to the broker failed, all attempts exhausted." ) );
returnStatus = EXIT_FAILURE;
}
else if( backoffAlgStatus == BackoffAlgorithmSuccess )
{
LogWarn( ( "Connection to the broker failed. Retrying connection "
"after %hu ms backoff.",
( unsigned short ) nextRetryBackOff ) );
Clock_SleepMs( nextRetryBackOff );
}
}
} while( ( opensslStatus != OPENSSL_SUCCESS ) && ( backoffAlgStatus == BackoffAlgorithmSuccess ) );

可组合性

可组合性是 coreMQTT 设计的核心原则。 这一设计原则意味着功能性以小件的形式存在,并可组成更丰富的功能。 CoreMQTT 库既具有丰富的功能,也提供了用于实现这些功能的部件。 您可以按原样使用某个功能,或重组这些部件以实现自定义行为,或添加自己的部件以获得更多可能性。

例如,由 MQTT_Publish() 函数执行的序列化分别在 MQTT_GetPublishPacketSize() 函数和MQTT_SerializePublishHeader() 函数中可用。 超轻量 MQTT 客户端演示中还包含这些序列化和反序列化小函数的另一种可能组合。 虽然全功能 MQTT_Publish() 函数与状态引擎交互以支持 QoS 1 和 QoS 2 ,但超轻量演示仅支持 QoS 0 ,无需会话或状态引擎。 超轻量演示中的 publish 函数显示了实际运用的序列化函数。 MQTT_GetPublishPacketSize() 返回序列化消息标头所需的字节数。 如果该数字小于提供的缓冲区的大小,则 MQTT_SerializePublishHeader() 会将标头写入缓冲区。 两次调用传输接口首先发送标头,然后发送有效负载。


static void mqttPublishToTopic( NetworkContext_t * pNetworkContext,
MQTTFixedBuffer_t * pFixedBuffer )
{
MQTTStatus_t result;
MQTTPublishInfo_t mqttPublishInfo;
size_t remainingLength;
size_t packetSize = 0;
size_t headerSize = 0;
int status;

/* Suppress unused variable warnings when asserts are disabled in build. */
( void ) status;
( void ) result;

/***

* For readability, error handling in this function is restricted to the use of

* asserts().

***/


/* Some fields not used by this demo so start with everything as 0. */
memset( ( void * ) &mqttPublishInfo, 0x00, sizeof( mqttPublishInfo ) );

/* This demo uses QOS0 */
mqttPublishInfo.qos = MQTTQoS0;
mqttPublishInfo.retain = false;
mqttPublishInfo.pTopicName = MQTT_EXAMPLE_TOPIC;
mqttPublishInfo.topicNameLength = ( uint16_t ) strlen( MQTT_EXAMPLE_TOPIC );
mqttPublishInfo.pPayload = MQTT_EXAMPLE_MESSAGE;
mqttPublishInfo.payloadLength = strlen( MQTT_EXAMPLE_MESSAGE );

/* Find out length of Publish packet size. */
result = MQTT_GetPublishPacketSize( &mqttPublishInfo, &remainingLength, &packetSize );
assert( result == MQTTSuccess );

/* Make sure the packet size is less than static buffer size. */
assert( packetSize < pFixedBuffer->size );

/* Serialize MQTT Publish packet header. The publish message payload will

* be sent directly in order to avoid copying it into the buffer.

* QOS0 does not make use of packet identifier, therefore value of 0 is used */

result = MQTT_SerializePublishHeader( &mqttPublishInfo,
0,
remainingLength,
pFixedBuffer,
&headerSize );
LogDebug( ( "Serialized PUBLISH header size is %lu.",
( unsigned long ) headerSize ) );
assert( result == MQTTSuccess );
/* Send Publish header to the broker. */
status = Plaintext_Send( pNetworkContext, ( void * ) pFixedBuffer->pBuffer, headerSize );
assert( status == ( int ) headerSize );
/* Send Publish payload to the broker */
status = Plaintext_Send( pNetworkContext, ( void * ) mqttPublishInfo.pPayload, mqttPublishInfo.payloadLength );
assert( status == ( int ) mqttPublishInfo.payloadLength );
}

处理收到的消息

应用程序需做的最大决定之一是如何处理新收到的消息。 您可能有一个简单的工作流程,其中只有一种类型的消息需要处理,或者有一种涉及更广的方法,其中各种消息需要在整个应用程序中进行多路复用。 CoreMQTT 从简单的案例开始,其中 MQTT_Init () 接受单个回调函数,以针对接收到的每个 PUBISH 或 ACK 消息调用。 MQTTEventCallback_t 类型和 MQTT_Init() 函数的文档字符串描述了传递给回调的值,并展示了调用 MQTT_Init() 的示例。

/**

* @ingroup mqtt_callback_types

* @brief Application callback for receiving incoming publishes and incoming

* acks.

*

* @note This callback will be called only if packets are deserialized with a

* result of #MQTTSuccess or #MQTTServerRefused. The latter can be obtained

* when deserializing a SUBACK, indicating a broker's rejection of a subscribe.

*

* @param[in] pContext Initialized MQTT context.

* @param[in] pPacketInfo Information on the type of incoming MQTT packet.

* @param[in] pDeserializedInfo Deserialized information from incoming packet.

*/

typedef void (* MQTTEventCallback_t )( struct MQTTContext * pContext,
struct MQTTPacketInfo * pPacketInfo,
struct MQTTDeserializedInfo * pDeserializedInfo );

/**

* @brief Initialize an MQTT context.

*

* This function must be called on a #MQTTContext_t before any other function.

*

* @note The #MQTTGetCurrentTimeFunc_t function for querying time must be defined. If

* there is no time implementation, it is the responsibility of the application

* to provide a dummy function to always return 0, provide 0 timeouts for

* all calls to #MQTT_Connect, #MQTT_ProcessLoop, and #MQTT_ReceiveLoop and configure

* the #MQTT_RECV_POLLING_TIMEOUT_MS and #MQTT_SEND_RETRY_TIMEOUT_MS configurations

* to be 0. This will result in loop functions running for a single iteration, and

* #MQTT_Connect relying on #MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT to receive the CONNACK packet.

*

* @param[in] pContext The context to initialize.

* @param[in] pTransportInterface The transport interface to use with the context.

* @param[in] getTimeFunction The time utility function to use with the context.

* @param[in] userCallback The user callback to use with the context to

* notify about incoming packet events.

* @param[in] pNetworkBuffer Network buffer provided for the context.

*

* @return #MQTTBadParameter if invalid parameters are passed;

* #MQTTSuccess otherwise.

*

* Example

*

* // Function for obtaining a timestamp.

* uint32_t getTimeStampMs();

* // Callback function for receiving packets.

* void eventCallback(

* MQTTContext_t * pContext,

* MQTTPacketInfo_t * pPacketInfo,

* MQTTDeserializedInfo_t * pDeserializedInfo

* );

* // Network send.

* int32_t networkSend( NetworkContext_t * pContext, const void * pBuffer, size_t bytes );

* // Network receive.

* int32_t networkRecv( NetworkContext_t * pContext, void * pBuffer, size_t bytes );

*

* MQTTContext_t mqttContext;

* TransportInterface_t transport;

* MQTTFixedBuffer_t fixedBuffer;

* uint8_t buffer[ 1024 ];

*

* // Clear context.

* memset( ( void * ) &mqttContext, 0x00, sizeof( MQTTContext_t ) );

*

* // Set transport interface members.

* transport.pNetworkContext = &someTransportContext

* transport.send = networkSend;

* transport.recv = networkRecv;

*

* // Set buffer members.

* fixedBuffer.pBuffer = buffer;

* fixedBuffer.size = 1024;

*

* status = MQTT_Init( &mqttContext, &transport, getTimeStampMs, eventCallback, &fixedBuffer );

*

* if( status == MQTTSuccess )

* {

* // Do something with mqttContext. The transport and fixedBuffer structs were

* // copied into the context, so the original structs do not need to stay in scope.

* }

*/


MQTTStatus_t MQTT_Init( MQTTContext_t * pContext,
const TransportInterface_t * pTransportInterface,
MQTTGetCurrentTimeFunc_t getTimeFunction,
MQTTEventCallback_t userCallback,
const MQTTFixedBuffer_t * pNetworkBuffer );

非常简单的回调可能会忽略所有 ACK ,并解析所有 PUBLISH 消息以获得所需值。 如果您需要不同函数根据主题处理的消息,请使用 MQTT_MatchTopic() 函数撰写回调。 下面的 MQTT_MatchTopic() 文档字符串包括一个简单的示例。


/**

* @brief A utility function that determines whether the passed topic filter and

* topic name match according to the MQTT 3.1.1 protocol specification.

*

* @param[in] pTopicName The topic name to check.

* @param[in] topicNameLength Length of the topic name.

* @param[in] pTopicFilter The topic filter to check.

* @param[in] topicFilterLength Length of topic filter.

* @param[out] pIsMatch This is filled with the whether there

* exists a match or not.

*

* @note The API assumes that the passed topic name is valid to meet the

* requirements of the MQTT 3.1.1 specification. Invalid topic names (for example,

* containing wildcard characters) should not be passed to the function.

* Also, the API checks validity of topic filter for wildcard characters ONLY if

* the passed topic name and topic filter do not have an exact string match.

*

* @return Returns one of the following:

* - #MQTTBadParameter, if any of the input parameters is invalid.

* - #MQTTSuccess, if the matching operation was performed.

*

* Example

*

* // Variables used in this example.

* const char * pTopic = "topic/match/1";

* const char * pFilter = "topic/#";

* MQTTStatus_t status = MQTTSuccess;

* bool match = false;

*

* status = MQTT_MatchTopic( pTopic, strlen( pTopic ), pFilter, strlen( pFilter ), &match );

* // Our parameters were valid, so this will return success.

* assert( status == MQTTSuccess );

*

* // For this specific example, we already know this value is true. This

* // check is placed here as an example for use with variable topic names.

* if( match )

* {

* // Application can decide what to do with the matching topic name.

* }

*/

MQTTStatus_t MQTT_MatchTopic( const char * pTopicName,
const uint16_t topicNameLength,
const char * pTopicFilter,
const uint16_t topicFilterLength,
bool * pIsMatch );

订阅管理器演示所示, MQTT_MatchTopic () 可构成功能齐全的订阅管理器。 下面的两个函数和 typedef(与它们的文档字符串一起显示)描述了将回调函数绑定到匹配的主题字符串。


/**

* @brief Callback type to be registered for a topic filter with the subscription manager.

*

* For incoming PUBLISH messages received on topics that match the registered topic filter,

* the callback would be invoked by the subscription manager.

*

* @param[in] pContext The context associated with the MQTT connection.

* @param[in] pPublishInfo The incoming PUBLISH message information.

*/

typedef void (* SubscriptionManagerCallback_t )( MQTTContext_t * pContext,
MQTTPublishInfo_t * pPublishInfo );

/**

* @brief Dispatches the incoming PUBLISH message to the callbacks that have their

* registered topic filters matching the incoming PUBLISH topic name. The dispatch

* handler will invoke all these callbacks with matching topic filters.

*

* @param[in] pContext The context associated with the MQTT connection.

* @param[in] pPublishInfo The incoming PUBLISH message information.

*/

void SubscriptionManager_DispatchHandler( MQTTContext_t * pContext,
MQTTPublishInfo_t * pPublishInfo );

/**

* @brief Utility to register a callback for a topic filter in the subscription manager.

*

* The callback will be invoked when an incoming PUBLISH message is received on

* a topic that matches the topic filter, @a pTopicFilter. The subscription manager

* accepts wildcard topic filters.

*

* @param[in] pTopicFilter The topic filter to register the callback for.

* @param[in] topicFilterLength The length of the topic filter string.

* @param[in] callback The callback to be registered for the topic filter.

*

* @note The subscription manager does not allow more than one callback to be registered

* for the same topic filter.

* @note The passed topic filter, @a pTopicFilter, is saved in the registry.

* The application must not free or alter the content of the topic filter memory

* until the callback for the topic filter is removed from the subscription manager.

*

* @return Returns one of the following:

* - #SUBSCRIPTION_MANAGER_SUCCESS if registration of the callback is successful.

* - #SUBSCRIPTION_MANAGER_REGISTRY_FULL if the registration failed due to registry

* being already full.

* - #SUBSCRIPTION_MANAGER_RECORD_EXISTS, if a registered callback already exists for

* the requested topic filter in the subscription manager.

*/

SubscriptionManagerStatus_t SubscriptionManager_RegisterCallback( const char * pTopicFilter,
uint16_t topicFilterLength,
SubscriptionManagerCallback_t pCallback );

演示代码展示了使用 subscribeToTopic() 函数订阅和通过 SubscriptionManager_RegisterCallback() 注册回调的组合。 演示首先注册回调,并在订阅失败时将其删除。 此方法涵盖了这样一种情况,即当 subscribeToTopic() 调用的 MQTT_ProcessLoop() 函数等待 SUBACK 消息时,发布便到达了主题——这十分有可能发生。


static int subscribeToAndRegisterTopicFilter( MQTTContext_t * pContext,
const char * pTopicFilter,
uint16_t topicFilterLength,
SubscriptionManagerCallback_t callback )
{
int returnStatus = EXIT_SUCCESS;
SubscriptionManagerStatus_t managerStatus = 0u;

/* Register the topic filter and its callback with subscription manager.

* On an incoming PUBLISH message whose topic name that matches the topic filter

* being registered, its callback will be invoked. */

managerStatus = SubscriptionManager_RegisterCallback( pTopicFilter,
topicFilterLength,
callback );

if( managerStatus != SUBSCRIPTION_MANAGER_SUCCESS )
{
returnStatus = EXIT_FAILURE;
}
else
{
LogInfo( ( "Subscribing to the MQTT topic %.*s.",
topicFilterLength,
pTopicFilter ) );

returnStatus = subscribeToTopic( pContext,
pTopicFilter,
topicFilterLength );
}

if( returnStatus != EXIT_SUCCESS )
{
/* Remove the registered callback for the temperature topic filter as

* the subscription operation for the topic filter did not succeed. */

( void ) SubscriptionManager_RemoveCallback( pTopicFilter,
topicFilterLength );
}

return returnStatus;
}

并发

您是否应将应用程序组织为一个简单的超级循环,或作为由 RTOS 或调度器管理的一组任务? 任一选择对于 coreMQTT 均可行。 如果您选择并发,则必须确保代码安全。 在为 FreeRTOS 编写时,较为有用的方法是指定一项任务来处理 MQTT 并通过安全 FreeRTOS 队列向该任务传递命令。 FreeRTOS MQTT代理使用此方法。 代理为 FreeRTOS 提供独立守护程序任务,以处理所有 MQTT 交互。 请参阅相关后续文章。

小结

Alan Kay 曾经说过, “简单的事情应该是简单的;复杂的事情应该是可能的。” coreMQTT 提供了在最简单的超级循环中可用的函数库,或可组合到复杂的多任务实时应用程序的函数,并举例说明如何操实现 关键的决策由您自行决定。

作者简介

Dan Good 在 AmazonWeb Services 担任高级软件开发工程师。 在联网领域工作多年后,蓬勃的创客文化激励他深入参与 IoT。 Dan 为 FreeRTOScore* 库和 AWS IoT SDK for embedded C 贡献了自己的力量,帮助客户进行创新。
查看此作者的文章
FreeRTOS 论坛 获得来自专家的行业领先支持,并与全球同行合作。 查看论坛
Copyright (C) Amazon Web Services, Inc. or its affiliates. All rights reserved.