coreMQTT Agent Demo
Single Threaded Vs Multi Threaded
There are two coreMQTT usage models,
single threaded and
multithreaded (multitasking). The single
threaded model uses the coreMQTT library solely from one thread, and requires you to make repeated,
explicit calls into the MQTT library. Multithreaded use cases can instead run the MQTT protocol in the background
within
an agent (or daemon) task, as shown in the demo documented on this page.
When you run the MQTT protocol in an agent task you do not have to explicitly manage any MQTT state or call the
MQTT_ProcessLoop()
API function. Also, when you use an agent task, multiple application tasks can share
a single MQTT connection without the need for synchronization primitives such as mutexes.
Demo Introduction
The coreMQTT Agent demo project uses the
FreeRTOS Windows
port, so it can be built and evaluated with the
free Community
version of Visual Studios on Windows, without the need for any MCU hardware.
This demo uses a thread safe queue to hold commands for interacting with the coreMQTT API. There are two tasks to
note in this demo:
- An MQTT agent (main) task processes the commands from the command queue while the other tasks enqueue them. This
task enters a loop, during which it processes commands from the command queue. If a termination command is received,
it will break out of the loop.
- A demo subpub task creates a subscription to an MQTT topic, then creates publish operations and pushes them to
the command queue. These publish operations are then run by the MQTT agent task. The demo subpub task
waits for the publish to complete, indicated by execution of the command completion callback, then enters a short
delay before it starts the next publish. This task shows examples of how application tasks would use the coreMQTT
Agent API.
For incoming publish messages, the coreMQTT Agent invokes a single callback function. This demo also includes a
subscription manager to allow tasks to specify a callback to invoke for incoming publish messages on topics to
which they have subscribed. The agent's incoming publish callback in this demo invokes the subscription manager
to fan out publishes to any task that has registered a subscription.
The coreMQTT Agent demo can be configured to use either a TLS connection with mutual authentication, or a plaintext
TCP connection. By default, the demo uses TLS. If the network unexpectedly disconnects during the demo, then the
client will attempt to reconnect with exponential backoff logic. We also recommend that you use mutual
authentication in any Internet of Things (IoT) application.
Source Code Organization
The Visual Studio solution for the Multithreaded MQTT demo is called
mqtt_multitask_demo.sln
and is in the
/FreeRTOS-Plus/Demo/coreMQTT_Windows_Simulator/MQTT_Multitask directory
of the
main FreeRTOS download.
Click to enlarge
Building the Demo Project
The demo project uses the
free community edition of Visual Studio. To build the demo:
- Open the
mqtt_multitask_demo.sln
Visual Studio solution file from within the Visual Studio IDE.
- Select Build Solution from the IDE's Build menu.
Note: If you use Microsoft Visual Studio 2017 or earlier, then a
Platform Toolset
compatible with the Visual Studio version must be selected:
Project -> RTOSDemos Properties ->
Platform Toolset
.
Configuring the Demo Project
The demo uses the
FreeRTOS-Plus-TCP TCP/IP stack, so follow
the instructions provided for the
TCP/IP starter project:
-
Install the pre-requisite components (such as WinPCap).
- Optionally
set a static or dynamic IP address, gateway address and netmask.
- Optionally
set a MAC address.
-
Select an Ethernet network interface]() on the host machine.
- ...and most important
test the network connection before attempting to run the MQTT demo.
All of these settings should be changed in the MQTT Agent demo project, not the TCP/IP starter project referred
to in the pages linked to above! As delivered, the TCP/IP stack is configured to use a dynamic IP address.
Configuring the MQTT Broker Connection
Option 1: TLS with Mutual Authentication (default)
This demo supports the same configuration options as the
MQTT Mutual Auth demo. Please see that demo's documentation for all of the available options.
Option 2: Plaintext
To enable quick setup with no certificate configuration required, the demo allows plaintext TCP connections to be used
in lieu of mutually authenticated TLS. To disable TLS, The macro
democonfigUSE_TLS
should be set to
0
in
demo_config
, or simply not defined at all. Then, the demo may be used with any
unencrypted MQTT broker (for example, Eclipse Mosquitto) by following the same instructions as the
Plaintext demo.
Important notes:
- Plain text connections are useful to learn how MQTT works and to debug connections because the MQTT
conversation can be viewed in a network sniffer such as
WireShark. However,
production IoT devices should not use an unencrypted plain text connection, as all traffic is exposed to any
device on the network. Never send private data over a plain text connection - we highly recommend
that you use encrypted and mutually authenticated connections for all IoT devices.
- The demo places keys in a header file for demonstration purposes only. We strongly recommend
that production devices store keys in a secure location, such as a secure element or enclave. Further,
we recommend production IoT devices access keys and other crypto objects via an API that does not
expose the keys, such as PKCS #11 or
PSA APIs.
Once a broker is decided, the code snippets below describe the compile time configuration constants that configure
the MQTT connection. Place any keys required for TLS connections in the same file:
demo_config.h
,
demo_config.h
coreMQTT Agent configuration constants
The following code snippet shows the compile time constants relevant to the coreMQTT Agent, along with their
default values. The coreMQTT Agent does not have its own configuration file, but it will use any macros defined
in
core_mqtt_config.h
. Macros defined there will be used in place of these defaults:
core_mqtt_config.h
Functionality
main()
initializes the TCP/IP stack before starting the FreeRTOS kernel scheduler. When the network
connects, the network event hook creates a single RTOS task. That task optionally creates multiple other tasks,
each of which sends and receives MQTT packets of various sizes and at various Quality of Service (QoS) levels.
The original thread then becomes the MQTT agent thread.
The following constant defines the created demo tasks. The links in the descriptions go to comments at the top
of each implementing source filethat provide more information.
#define democonfigNUM_SIMPLE_SUB_PUB_TASKS_TO_CREATE [n]
Sets the number of instances to create of the task implemented in simple_pub_sub_demo.c
.
MQTT Agent Task
The MQTT agent task is responsible for invoking
MQTTAgent_CommandLoop()
. This task calls the requested
coreMQTT API when a command is posted to its queue.
Commands
When you invoke a coreMQTT Agent API it creates a command that is sent to the agent task's queue, which is processed
in
MQTTAgent_CommandLoop()
. At time the command is created, optional completion callback and context
parameters may be passed. Once the corresponding command is complete, the completion callback will be invoked with
the passed context and any return values as a result of the command. The signature for the completion callback is
as follows:
typedef void (* MQTTAgentCommandCallback_t )( void * pCmdCallbackContext,
MQTTAgentReturnInfo_t * pReturnInfo );
The command completion context is user-defined; for this demo, it is:
struct MQTTAgenCommandContext
Commands are considered completed when:
- Subscribes, unsubscribes, and publishes with QoS > 0: Once the corresponding acknowledgment packet has been
received.
- All other operations: Once the corresponding coreMQTT API has been invoked.
Any structures used by the command, including publish information, subscription information, and completion contexts,
must stay in scope until the command has completed. A calling task must not reuse any of a command's structures before
the invocation of the completion callback. Note that since the completion callback is invoked by the MQTT Agent, it
will run with the thread context of the agent task, not the task that created the command. Inter-process communication
mechanisms, such as task notifications or queues, can be used to signal the calling task of command completion.
Running the Command Loop
Commands are processed continuously in
MQTTAgent_CommandLoop()
. If there are no commands to be processed,
the loop will wait for a maximum of
MQTT_AGENT_MAX_EVENT_QUEUE_WAIT_TIME
for one to be added to the queue,
and, if no command is added, run a single iteration of
MQTT_ProcessLoop()
. This ensures both that MQTT
Keep-Alive is managed, and that any incoming publishes are received even when there are no commands in the queue.
The command loop function will return for the following reasons:
- A command returns any status code besides
MQTTSuccess
. The error status is returned by the command
loop, so you may decide how to handle it. In this demo, the TCP connection is reestablished,
and a reconnect attempt is made. If there is any error, a reconnection can occur in the background without any
intervention from other tasks using MQTT.
- A disconnect command (from
MQTTAgent_Disconnect
) is processed. The command loop exits so that
TCP can be disconnected.
- A terminate command (from
MQTTAgent_Terminate
) is processed. This command also marks any command
still in the queue or awaiting an acknowledgment packet as an error, with a return code of
MQTTRecvFailed
.
Here's how these cases are handled in the agent task:
prvMQTTAgentTask()
Creating the other Tasks
The MQTT agent task in this demo also creates the subscribe-publish tasks. While this is not necessary for the
implementation of an agent task, it is done here as the agent is the main task in this demo:
prvConnectAndCreateDemoTasks()
Subscription Manager
Since the demo uses multiple topics, a subscription manager is a convenient way to associate subscribed topics with
unique callbacks or tasks. The subscription manager in this demo is single-threaded, so it should not be used by multiple
tasks concurrently. In this demo, subscription manager functions are only called from callback functions that are
passed to the MQTT agent, and run only with the agent task's thread context. The following types and functions
compose the subscription manager:
Callback function
Simple subscribe-publish Task
Each instance of the subscribe-publish task creates a subscription to an MQTT topic, and creates publish operations
for that topic. To demonstrate multiple publish types, even numbered tasks use QoS 0 (which are complete once the
publish packet is sent) and odd tasks use QoS 1 (which are complete upon receipt of a PUBACK packet). The sub-pub
tasks are defined as follows:
prvSimpleSubscribePublishTask()
Copyright (C) Amazon Web Services, Inc. or its affiliates. All rights reserved.