第04章 实现MQTT通信
约 2311 字大约 8 分钟
2026-01-27
4.1 接口层: W5500
① 原理图


② CubeMX 配置


③ 移植W5500驱动
官方库地址:https://github.com/Wiznet/ioLibrary_Driver/tree/master
④ 接口层代码
Int_W5500.h
#ifndef __INT_W5500_H__
#define __INT_W5500_H__
#include "Com_Debug.h"
#include "wizchip_conf.h"
#include "socket.h"
/**
* @brief 初始化W5500
*
*/
void Int_W5500_Init(void);
#endif /* __INT_W5500_H__ */Int_W5500.c
#include "Int_W5500.h"
// 定义IP地址
static uint8_t ip_addr[4] = {192, 168, 17, 120};
// 定义网关地址
static uint8_t gw_addr[4] = {192, 168, 17, 1};
// 定义子网掩码
static uint8_t snm_addr[4] = {255, 255, 255, 0};
// 定义MAC地址
static uint8_t mac_addr[6] = {0x1F, 0x8A, 0xED, 0x12, 0x34, 0x56};
/**
* @brief 初始化W5500
*
*/
void Int_W5500_Init(void)
{
// 1. 复位
// 拉低复位引脚,并持续至少500us, 这里延时100ms
HAL_GPIO_WritePin(W5500_RST_GPIO_Port, W5500_RST_Pin, GPIO_PIN_RESET);
HAL_Delay(100);
// 拉高复位引脚
HAL_GPIO_WritePin(W5500_RST_GPIO_Port, W5500_RST_Pin, GPIO_PIN_SET);
// 延时100ms,实现稳定
HAL_Delay(100);
// 2. 注册需要使用的操作函数
reg_wizchip_all_cbfunc();
// 3. 对W5550进行网络配置
// 3.1 配置IP地址
setSIPR(ip_addr);
// 3.3 配置网关地址
setGAR(gw_addr);
// 3.4 配置子网掩码
setSUBR(snm_addr);
// 3.5 配置MAC地址
setSHAR(mac_addr);
}⑤ 应用层调用接口层代码
App_WQTT.h
#ifndef __APP_MQTT_H__
#define __APP_MQTT_H__
#include "Com_Debug.h"
#include "Int_W5500.h"
/**
* @brief 初始化MQTT
*
*/
void App_MQTT_Init(void);
#endif /* __APP_MQTT_H__ */App_WQTT.c
#include "App_MQTT.h"
/**
* @brief 初始化MQTT
*
*/
void App_MQTT_Init(void)
{
// 1. 初始化W5500
Int_W5500_Init();
}App_Task.h
#ifndef __APP_TASK_H__
#define __APP_TASK_H__
#include "FreeRTOS.h"
#include "task.h"
#include "Com_Debug.h"
#include "App_MQTT.h"
/**
* @brief 启动任务
*
*/
void App_TASK_Start(void);
#endif /* __APP_TASK_H__ */App_Task.c
#include "App_Task.h"
// MQTT任务 ------------------------------------
// MQTT任务函数的原型
void mqtt_task_callback(void *pvParameters);
// MQTT任务名称
#define MQTT_TASK_NAME "mqtt_task"
// MQTT任务堆栈大小
#define MQTT_TASK_STACK_SIZE 1024
// MQTT任务的优先级
#define MQTT_TASK_PRIORITY 1
// MQTT任务的句柄
TaskHandle_t mqtt_task_handle;
/**
* @brief 启动 FreeRTOS 任务管理
*
*/
void App_Task_Start(void)
{
// 进入临界区
taskENTER_CRITICAL();
// 创建显示任务
xTaskCreate(mqtt_task_callback, MQTT_TASK_NAME, MQTT_TASK_STACK_SIZE, NULL, MQTT_TASK_PRIORITY, &mqtt_task_handle) == pdPASS ? printf("MQTT任务创建成功! \n") : printf("MQTT任务创建失败! \n");
taskEXIT_CRITICAL();
// 启动任务调度器 ( vTaskStartScheduler() 后面的代码不会被执行)
printf("任务调度器启动... \n");
vTaskStartScheduler();
}
// MQTT任务函数的实现
void mqtt_task_callback(void *pvParameters)
{
printf("MQTT任务启动... \n");
// 初始化MQTT模块
printf("MQTT应用初始化... \n");
App_MQTT_Init();
// 循环
while (1)
{
vTaskDelay(50);
}
}4.2 应用层:MQTT
① 相关函数
NewNetwork() 新建网络对象, 位于 mqtt_interface.h
ConnectNetwork() 连接网络,建立TCP连接(传输层), 位于 mqtt_interface.h
MQTTClientInit() MQTT客户端初始化,位于 MQTTClient.h
MQTTConnect() MQTT连接(应用层)
MQTTSubscribe() MQTT订阅,位于 MQTTClient.h
MQTTYield() 接收所订阅主题的消息,需要轮询调用,位于 MQTTClient.h
MQTTPublish() 向指定主题发布消息② 数据格式
MQTT服务器到网关再到电机驱动
传输方向: 用户MQTT客户端(手机App、小程序、MQTTX软件) -> MQTT服务器 -> 32网关 -> 电机驱动板{
"connection_type": 1, // 1:Modbus; 2:can 3:lora
"device_id": 3, // 如果是Modbus就是ModbusId,范围1~247
"motor_status": 0, // 0:关闭; 1:开启
"target_angle": 3600, // 要旋转的目标角度,360表示1圈
"target_speed": 200 // 目标速度,范围20~200
}电机驱动到网关再到MQTT服务器
传输方向:电机驱动板 -> 32网关 -> MQTT服务器 -> 用户MQTT客户端(手机App、小程序、MQTTX软件){
"device_id": 1, // 如果是Modbus就是ModbusId,范围1~247
"motor_status": 0, // 0:关闭; 1:开启
"current_angle": 3600, // 实时角度,360表示1圈
"current_speed": 200 // 实时速度,范围20~200
}③ 代码
需将cJSON移植进工程Com_Global.h
#ifndef __COM_GLOBAL_H__
#define __COM_GLOBAL_H__
#include <stdint.h>
// 定义结构体类型:表示MQTT接收到的订阅消息
typedef struct
{
uint8_t connection_type; // 1:Modbus; 2:can 3:lora
uint8_t device_id; // 如果是Modbus就是ModbusId,范围1~247
uint8_t motor_status; // 0:关闭; 1:开启
float target_angle; // 要旋转的目标角度,360表示1圈
float target_speed; // 目标速度,范围20~200
} MQTT_ReceiveMsg_Ttypedef;
// 定义结构体类型:表示发送的MQTT消息
typedef struct
{
uint8_t device_id; // 如果是Modbus就是ModbusId,范围1~247
uint8_t motor_status; // 0:关闭; 1:开启
float current_angle; // 当前旋转角度,360表示1圈
float current_speed; // 当前速度,范围20~200
} MQTT_SendMsg_Ttypedef;
#endif /* __COM_GLOBAL_H__ */Com_Global.c
#include "Com_Global.h"App_MQTT.h
#ifndef __APP_MQTT_H__
#define __APP_MQTT_H__
#include <string.h>
#include <stdlib.h>
#include "cJSON.h"
#include "Com_Global.h"
#include "Com_Debug.h"
#include "Int_W5500.h"
// 宏定义:订阅的主题名称
#define MQTT_SUBSCRIBE_TOPIC "StepperMotorGateway/receive"
// 宏定义:发布的主题名称
#define MQTT_PUBLISH_TOPIC "StepperMotorGateway/send"
/**
* @brief MQTT应用初始化
*
*/
void App_MQTT_Init(void);
/**
* @brief MQTT应用重新连接
*
*/
void App_MQTT_Reconnect(void);
/**
* @brief 发送MQTT消息
*
*/
void App_MQTT_Send(void);
/**
* @brief MQTT应用接收消息
*
*/
void App_MQTT_Receive(void);
#endif /* __APP_MQTT_H__ */App_MQTT.c
#include "App_MQTT.h"
// 定义MQTT服务器IP
uint8_t mqtt_server_ip[4] = {192, 168, 16, 24};
// 定义MQTT服务器端口
uint16_t mqtt_server_port = 1883;
// 创建Network对象
Network network;
// 定义socket ID
uint8_t socket_id = 0;
// 创建MQTT的发送缓冲区
uint8_t mqtt_send_buf[256];
// 创建MQTT的接收缓冲区
uint8_t mqtt_read_buf[256];
// 创建 MQTT 客户端对象 (必须定义为全局, 如果在函数中会超出函数栈空间大小)
MQTTClient mqtt_client;
// 创建 MQTT 连接数据对象 (必须定义为全局,如果在函数中会超出函数栈空间大小)
MQTTPacket_connectData connect_data;
// 声明MQTT_ReceiveMsg_Ttypedef结构体变量
MQTT_ReceiveMsg_Ttypedef mqtt_receive_msg;
// 声明MQTT_SendMsg_Ttypedef结构体变量
MQTT_SendMsg_Ttypedef mqtt_send_msg;
// 声明要发布的消息对象
MQTTMessage mqtt_publish_msg = {QOS0, 0, 0, 0, NULL, 0};
// 静态函数:定义订阅消息的回调函数
static void App_MQTT_SubscribeCallback(MessageData *msg)
{
// 打印订阅消息
// printf("MQTT: 消息内容: %s, 消息长度: %d \n", (char *)msg->message->payload, msg->message->payloadlen);
// 解析JSON字符串
cJSON *root = cJSON_Parse((char *)msg->message->payload);
if (root == NULL)
{
// JSON 格式错误
printf("MQTT: JSON 格式错误\r\n");
return;
}
// 提取connection_type字段
cJSON *connection_type_json = cJSON_GetObjectItem(root, "connection_type");
if (!cJSON_IsNumber(connection_type_json))
{
// connection_type字段错误
printf("MQTT: 必须包含 connection_type 字段! \n");
return;
}
mqtt_receive_msg.connection_type = connection_type_json->valueint;
// 提取 device_id 字段
cJSON *device_id_json = cJSON_GetObjectItem(root, "device_id");
if (!cJSON_IsNumber(device_id_json))
{
// device_id字段错误
printf("MQTT: 必须包含 device_id 字段! \n");
return;
}
mqtt_receive_msg.device_id = device_id_json->valueint;
// 提取 motor_status 字段
cJSON *motor_status_json = cJSON_GetObjectItem(root, "motor_status");
if (!cJSON_IsNumber(motor_status_json))
{
// motor_status字段错误
printf("MQTT: 必须包含 motor_status 字段! \n");
return;
}
mqtt_receive_msg.motor_status = motor_status_json->valueint;
// 提取 target_angle 字段
cJSON *target_angle_json = cJSON_GetObjectItem(root, "target_angle");
if (!cJSON_IsNumber(target_angle_json))
{
// target_angle字段错误
printf("MQTT: 必须包含 target_angle 字段! \n");
return;
}
mqtt_receive_msg.target_angle = target_angle_json->valueint;
// 提取 target_speed 字段
cJSON *target_speed_json = cJSON_GetObjectItem(root, "target_speed");
if (!cJSON_IsNumber(target_speed_json))
{
// target_speed字段错误
printf("MQTT: 必须包含 target_speed 字段! \n");
return;
}
mqtt_receive_msg.target_speed = target_speed_json->valueint;
// 释放JSON根节点内存
cJSON_Delete(root);
// 打印提取到字段
printf("connection_type: %d \n", mqtt_receive_msg.connection_type);
printf("device_id: %d \n", mqtt_receive_msg.device_id);
printf("motor_status: %d \n", mqtt_receive_msg.motor_status);
printf("target_angle: %.2f \n", mqtt_receive_msg.target_angle);
printf("target_speed: %.2f \n\n\n", mqtt_receive_msg.target_speed);
}
// 静态函数:连接MQTT服务器并订阅
static void App_MQTT_ConnectAndSubscribe(void)
{
// 1. 建立 TCP 连接 (传输层)------------------------------------
// 1.1 创建网络连接对象并制定socket ID
NewNetwork(&network, socket_id);
// 1.2 创建socket客户端、连接服务器
if (ConnectNetwork(&network, mqtt_server_ip, mqtt_server_port) != SOCK_OK)
{
// TCP 连接失败
printf("MQTT: TCP 连接失败\r\n");
return;
}
printf("MQTT: TCP 连接成功\r\n");
// 2.建立MQTT连接 (应用层) --------------------------------------------
// 2.1 初始化MQTT客户端
MQTTClientInit(&mqtt_client, &network, 1000, mqtt_send_buf, sizeof(mqtt_send_buf), mqtt_read_buf, sizeof(mqtt_read_buf));
// 2.2 连接MQTT服务器
// 设置连接参数
connect_data.MQTTVersion = 4; // 4 表示 3.1.1
connect_data.clientID.cstring = "Steperr_Motor_Gateway01";
connect_data.keepAliveInterval = 60; // 60s的1.5内时间内不发送消息也可以保持连接接
connect_data.cleansession = 1; // 清除会话,断开连接后需要重新订阅
connect_data.willFlag = 0; // 关闭遗嘱
// 连接
if (MQTTConnect(&mqtt_client, &connect_data) != SUCCESS)
{
// 连接失败
printf("MQTT: MQTT 连接失败\r\n");
return;
}
printf("MQTT: MQTT 连接成功\r\n");
// 3. 订阅 ------------------------------------------------------------
if (MQTTSubscribe(&mqtt_client, MQTT_SUBSCRIBE_TOPIC, QOS0, App_MQTT_SubscribeCallback) != SUCCESS)
{
// 订阅失败
printf("MQTT: MQTT 订阅失败\r\n");
return;
}
printf("MQTT: MQTT 订阅成功\r\n");
}
/**
* @brief MQTT应用初始化
*
*/
void App_MQTT_Init(void)
{
// 1. 初始化W5500 (完成物理层、IP层)
Int_W5500_Init();
// 2. 延时1s
HAL_Delay(1000);
// 3. 连接MQTT服务器(完成传输层、应用层)并订阅主题
App_MQTT_ConnectAndSubscribe();
}
/**
* @brief MQTT应用重新连接
*
*/
void App_MQTT_Reconnect(void)
{
// 如果连接断开,就重新连接和订阅
if (getSn_SR(socket_id) != SOCK_ESTABLISHED)
{
printf("断开连接,重新连接...");
App_MQTT_ConnectAndSubscribe();
}
}
/**
* @brief 发送MQTT消息
*
*/
void App_MQTT_Send(void)
{
// 1. 设置要发送的内容 -----------------
mqtt_send_msg.device_id = 2;
mqtt_send_msg.motor_status = 0;
mqtt_send_msg.current_angle = 0.0;
mqtt_send_msg.current_speed = 0.0;
// 2. 将发送的内容转为JSON字符串
// 2.1 创建JSON根节点
cJSON *root = cJSON_CreateObject();
// 2.2 添加字段到JSON根节点
cJSON_AddNumberToObject(root, "device_id", mqtt_send_msg.device_id);
cJSON_AddNumberToObject(root, "motor_status", mqtt_send_msg.motor_status);
cJSON_AddNumberToObject(root, "current_angle", mqtt_send_msg.current_angle);
cJSON_AddNumberToObject(root, "current_speed", mqtt_send_msg.current_speed);
// 2.3 将JSON根节点转换为字符串
char *json_str = cJSON_Print(root);
printf("要发送的 JSON 字符串: %s \n", json_str);
// 2.5 MQTTF发布
mqtt_publish_msg.payload = json_str;
mqtt_publish_msg.payloadlen = strlen(json_str);
if (MQTTPublish(&mqtt_client, MQTT_PUBLISH_TOPIC, &mqtt_publish_msg) == SUCCESS)
{
printf("MQTT: 消息发布成功\r\n");
}
else
{
printf("MQTT: 消息发布失败\r\n");
}
// 2.6 释放JSON字符串和JSON根节点内存
free(json_str);
cJSON_Delete(root);
}
/**
* @brief MQTT应用接收消息, 需要被轮询调用
*
*/
void App_MQTT_Receive(void)
{
MQTTYield(&mqtt_client, 1000);
}App_Task.c
修改任务函数:
/* 代码省略 ... */
// MQTT任务函数的实现
void mqtt_task_callback(void *pvParameters)
{
printf("MQTT任务启动... \n");
// 初始化MQTT模块
printf("MQTT应用初始化... \n");
App_MQTT_Init();
// 循环
while (1)
{
// 检测是否需要重新连接
App_MQTT_Reconnect();
// 发送MQTT消息
// App_MQTT_Send();
// 接收MQTT消息
App_MQTT_Receive();
vTaskDelay(50);
}
}④ 注意事项
1. W5500 设置完自身的IP、子网掩码、网关、MAC地址之后至少延时 1s
2. FreeRTOS 修改默认的堆空间大小
FreeRTOSConfig.h
#define configTOTAL_HEAP_SIZE ( ( size_t ) ( 17 * 1024 ) )
改为
#define configTOTAL_HEAP_SIZE ( ( size_t ) ( 8 * 1024 ) )
3. CubeMX配置
Minimum Heap Size 改为 0x800
Minimum Stack Size 改为 0x400
4. 处理完JSON后记得释放对象