TA的每日心情 | 开心 13 小时前 |
---|
签到天数: 333 天 连续签到: 23 天 [LV.8]以坛为家I
|
【目的】
移植mqtt实现连接mqtt服务器,实现数据的收发。
【实验步骤】
1、在前的lwip移植的基础之上,添加mqtt的代码,【STM32H735-DK 测评】手工配置LWIP - 板卡试用 - 与非网 (eefocus.com)
2、在Core/inc创建bsp_mqtt.h内空如下:
- <font size="4">/*
- * bsp_mqtt.h
- *
- * Created on: Mar 19, 2024
- * Author: liujianhua
- */
- #ifndef INC_BSP_MQTT_H_
- #define INC_BSP_MQTT_H_
- void bsp_mqtt_init(void);
- #endif /* INC_BSP_MQTT_H_ */
- </font>
复制代码 其实就是声明bsp_mqtt_init函数。
3、在Core/src目录下面创建bsp_mqtt.c。
在野火的教程中,他是从mqtt官网下载的源码进行移植,需要创建两个线程来实现mqtt的订阅与发布,这次是直接什么stmcubemax下的mqtt来实现,所以不需要创建任务,直接以回调的方式来实现。
其内容如下:
- /*
- * bsp_mqtt.c
- *
- * Created on: Mar 19, 2024
- * Author: liujianhua
- */
- /*-----------------------------------------------------------
- * Includes files
- *----------------------------------------------------------*/
- /* lib includes. */
- #include <string.h>
- /* segger rtt includes. */
- #include "main.h"
- //#include "bsp_mqtt.h"
- /* FreeRTOS includes. */
- #include "FreeRTOS.h"
- #include "semphr.h"
- /* lwip includes. */
- #include "lwip/apps/mqtt.h"
- #include "lwip/ip4_addr.h"
- static err_t bsp_mqtt_connect(void);
- #define USE_MQTT_MUTEX //使用发送数据的互斥锁,多个任务有发送才必须
- #ifdef USE_MQTT_MUTEX
- static SemaphoreHandle_t s__mqtt_publish_mutex = NULL;
- #endif /* USE_MQTT_MUTEX */
- static mqtt_client_t *s__mqtt_client_instance = NULL; //mqtt连接句柄,这里一定要设置全局变量,防止 lwip 底层重复申请空间
- //MQTT 数据结构体
- struct mqtt_recv_buffer
- {
- char recv_buffer[1024]; //储存接收的buffer
- uint16_t recv_len; //记录已接收多少个字节的数据,MQTT的数据分包来的
- uint16_t recv_total; //MQTT接收数据的回调函数会有个总的大小
- };
- //结构体初始化
- struct mqtt_recv_buffer s__mqtt_recv_buffer_g = {
- .recv_len = 0,
- .recv_total = 0,
- };
- static err_t bsp_mqtt_subscribe(mqtt_client_t* mqtt_client, char * sub_topic, uint8_t qos);
- /* ===========================================
- 接收回调函数
- ============================================== */
- /*!
- * @brief mqtt 接收数据处理函数接口,需要在应用层进行处理
- * 执行条件:mqtt连接成功
- *
- * @param [in1] : 用户提供的回调参数指针
- * @param [in2] : 接收的数据指针
- * @param [in3] : 接收数据长度
- * @retval: 处理的结果
- */
- __weak int mqtt_rec_data_process(void* arg, char *rec_buf, uint64_t buf_len)
- {
- printf("recv_buffer = %s\n", rec_buf);
- return 0;
- }
- /*!
- * @brief MQTT 接收到数据的回调函数
- * 执行条件:MQTT 连接成功
- *
- * @param [in1] : 用户提供的回调参数指针
- * @param [in2] : MQTT 收到的分包数据指针
- * @param [in3] : MQTT 分包数据长度
- * @param [in4] : MQTT 数据包的标志位
- * @retval: None
- */
- static void bsp_mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags)
- {
- if( (data == NULL) || (len == 0) )
- {
- printf("mqtt_client_incoming_data_cb: condition error @entry\n");
- return;
- }
- if(s__mqtt_recv_buffer_g.recv_len + len < sizeof(s__mqtt_recv_buffer_g.recv_buffer))
- {
- //
- snprintf(&s__mqtt_recv_buffer_g.recv_buffer[s__mqtt_recv_buffer_g.recv_len], len, "%s", data);
- s__mqtt_recv_buffer_g.recv_len += len;
- }
- if ( (flags & MQTT_DATA_FLAG_LAST) == MQTT_DATA_FLAG_LAST )
- {
- //处理数据
- mqtt_rec_data_process(arg , s__mqtt_recv_buffer_g.recv_buffer, s__mqtt_recv_buffer_g.recv_len);
- //已接收字节计数归0
- s__mqtt_recv_buffer_g.recv_len = 0;
- //清空接收buffer
- memset(s__mqtt_recv_buffer_g.recv_buffer, 0, sizeof(s__mqtt_recv_buffer_g.recv_buffer));
- }
- printf("mqtt_client_incoming_data_cb:reveiving incomming data.\n");
- }
- /*!
- * @brief MQTT 接收到数据的回调函数
- * 执行条件:MQTT 连接成功
- *
- * @param [in] : 用户提供的回调参数指针
- * @param [in] : MQTT 收到数据的topic
- * @param [in] : MQTT 收到数据的总长度
- * @retval: None
- */
- static void bsp_mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len)
- {
- if( (topic == NULL) || (tot_len == 0) )
- {
- printf("bsp_mqtt_incoming_publish_cb: condition error @entry\n");
- return;
- }
- printf("bsp_mqtt_incoming_publish_cb: topic = %s.\n",topic);
- printf("bsp_mqtt_incoming_publish_cb: tot_len = %d.\n",tot_len);
- s__mqtt_recv_buffer_g.recv_total = tot_len; //需要接收的总字节
- s__mqtt_recv_buffer_g.recv_len = 0; //已接收字节计数归0
- //清空接收buffer
- memset(s__mqtt_recv_buffer_g.recv_buffer, 0, sizeof(s__mqtt_recv_buffer_g.recv_buffer));
- }
- /* ===========================================
- 连接状态回调函数
- ============================================== */
- /*!
- * @brief MQTT 连接成功的处理函数,需要的话在应用层定义
- *
- * @param [in1] : MQTT 连接句柄
- * @param [in2] : MQTT 连接参数指针
- *
- * @retval: None
- */
- __weak void mqtt_conn_suc_proc(mqtt_client_t *client, void *arg)
- {
- char test_sub_topic[] = "/public/TEST/AidenHinGwenWong_sub";
- bsp_mqtt_subscribe(client,test_sub_topic,0);
- }
- /*!
- * @brief MQTT 处理失败调用的函数
- *
- * @param [in1] : MQTT 连接句柄
- * @param [in2] : MQTT 连接参数指针
- *
- * @retval: None
- */
- __weak void mqtt_error_process_callback(mqtt_client_t * client, void *arg)
- {
- }
- /*!
- * @brief MQTT 连接状态的回调函数
- *
- * @param [in] : MQTT 连接句柄
- * @param [in] : 用户提供的回调参数指针
- * @param [in] : MQTT 连接状态
- * @retval: None
- */
- static void bsp_mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status)
- {
- if( client == NULL )
- {
- printf("bsp_mqtt_connection_cb: condition error @entry\n");
- return;
- }
- if ( status == MQTT_CONNECT_ACCEPTED ) //Successfully connected
- {
- printf("bsp_mqtt_connection_cb: Successfully connected\n");
- // 注册接收数据的回调函数
- mqtt_set_inpub_callback(client, bsp_mqtt_incoming_publish_cb, bsp_mqtt_incoming_data_cb, arg);
- //成功处理函数
- mqtt_conn_suc_proc(client, arg);
- }
- else
- {
- printf("bsp_mqtt_connection_cb: Fail connected, status = %s\n", lwip_strerr(status) );
- //错误处理
- mqtt_error_process_callback(client, arg);
- }
- }
- /*!
- * @brief 连接到 mqtt 服务器
- * 执行条件:无
- *
- * @param [in] : None
- *
- * @retval: 连接状态,如果返回不是 ERR_OK 则需要重新连接
- */
- static err_t bsp_mqtt_connect(void)
- {
- printf("bsp_mqtt_connect: Enter!\n");
- err_t ret;
- struct mqtt_connect_client_info_t mqtt_connect_info = {
- "AidenHinGwenWong_MQTT_Test", /* 这里需要修改,以免在同一个服务器两个相同ID会发生冲突 */
- NULL, /* MQTT 服务器用户名 */
- NULL, /* MQTT 服务器密码 */
- 60, /* 与 MQTT 服务器保持连接时间,时间超过未发送数据会断开 */
- "/public/TEST/AidenHinGwenWong_pub",/* MQTT遗嘱的消息发送topic */
- "Offline_pls_check", /* MQTT遗嘱的消息,断开服务器的时候会发送 */
- 0, /* MQTT遗嘱的消息 Qos */
- 0 /* MQTT遗嘱的消息 Retain */
- };
- ip_addr_t server_ip;
- ip4_addr_set_u32(&server_ip, ipaddr_addr("192.168.3.156")); //MQTT服务器IP
- uint16_t server_port = 1883; //注意这里是 MQTT 的 TCP 连接方式的端口号!!!!
- if (s__mqtt_client_instance == NULL)
- {
- // 句柄==NULL 才申请空间,否则无需重复申请
- s__mqtt_client_instance = mqtt_client_new();
- }
- if (s__mqtt_client_instance == NULL)
- {
- //防止申请失败
- printf("bsp_mqtt_connect: s__mqtt_client_instance malloc fail @@!!!\n");
- return ERR_MEM;
- }
- //进行连接,注意:如果需要带入 arg ,arg必须是全局变量,局部变量指针会被回收,大坑!!!!!
- ret = mqtt_client_connect(s__mqtt_client_instance, &server_ip, server_port, bsp_mqtt_connection_cb, NULL, &mqtt_connect_info);
- /******************
- 小提示:连接错误不需要做任何操作,mqtt_client_connect 中注册的回调函数里面做判断并进行对应的操作
- *****************/
- printf("bsp_mqtt_connect: connect to mqtt %s\n", lwip_strerr(ret));
- return ret;
- }
- /* ===========================================
- 发送接口、回调函数
- ============================================== */
- /*!
- * @brief MQTT 发送数据的回调函数
- * 执行条件:MQTT 连接成功
- *
- * @param [in] : 用户提供的回调参数指针
- * @param [in] : MQTT 发送的结果:成功或者可能的错误
- * @retval: None
- */
- static void mqtt_client_pub_request_cb(void *arg, err_t result)
- {
- mqtt_client_t *client = (mqtt_client_t *)arg;
- if (result != ERR_OK)
- {
- printf("mqtt_client_pub_request_cb: c002: Publish FAIL, result = %s\n", lwip_strerr(result));
- //错误处理
- mqtt_error_process_callback(client, arg);
- }
- else
- {
- printf("mqtt_client_pub_request_cb: c005: Publish complete!\n");
- }
- }
- /*!
- * @brief 发送消息到服务器
- * 执行条件:无
- *
- * @param [in1] : mqtt 连接句柄
- * @param [in2] : mqtt 发送 topic 指针
- * @param [in3] : 发送数据包指针
- * @param [in4] : 数据包长度
- * @param [in5] : qos
- * @param [in6] : retain
- * @retval: 发送状态
- * @note: 有可能发送不成功但是现实返回值是 0 ,需要判断回调函数 mqtt_client_pub_request_cb 是否 result == ERR_OK
- */
- err_t bsp_mqtt_publish(mqtt_client_t *client, char *pub_topic, char *pub_buf, uint16_t data_len, uint8_t qos, uint8_t retain)
- {
- if ( (client == NULL) || (pub_topic == NULL) || (pub_buf == NULL) || (data_len == 0) || (qos > 2) || (retain > 1) )
- {
- printf("bsp_mqtt_publish: input error@@" );
- return ERR_VAL;
- }
- //判断是否连接状态
- if(mqtt_client_is_connected(client) != pdTRUE)
- {
- printf("bsp_mqtt_publish: client is not connected\n");
- return ERR_CONN;
- }
- err_t err;
- #ifdef USE_MQTT_MUTEX
- // 创建 mqtt 发送互斥锁
- if (s__mqtt_publish_mutex == NULL)
- {
- printf("bsp_mqtt_publish: create mqtt mutex ! \n" );
- s__mqtt_publish_mutex = xSemaphoreCreateMutex();
- }
- if (xSemaphoreTake(s__mqtt_publish_mutex, portMAX_DELAY) == pdPASS)
- #endif /* USE_MQTT_MUTEX */
- {
- err = mqtt_publish(client, pub_topic, pub_buf, data_len, qos, retain, mqtt_client_pub_request_cb, (void*)client);
- printf("bsp_mqtt_publish: mqtt_publish err = %s\n", lwip_strerr(err) );
- #ifdef USE_MQTT_MUTEX
- printf("bsp_mqtt_publish: mqtt_publish xSemaphoreTake\n");
- xSemaphoreGive(s__mqtt_publish_mutex);
- #endif /* USE_MQTT_MUTEX */
- }
- return err;
- }
- /* ===========================================
- MQTT 订阅接口函数
- ============================================== */
- /*!
- * @brief MQTT 订阅的回调函数
- * 执行条件:MQTT 连接成功
- *
- * @param [in] : 用户提供的回调参数指针
- * @param [in] : MQTT 订阅结果
- * @retval: None
- */
- static void bsp_mqtt_request_cb(void *arg, err_t err)
- {
- if ( arg == NULL )
- {
- printf("bsp_mqtt_request_cb: input error@@\n");
- return;
- }
- mqtt_client_t *client = (mqtt_client_t *)arg;
- if ( err != ERR_OK )
- {
- printf("bsp_mqtt_request_cb: FAIL sub, sub again, err = %s\n", lwip_strerr(err));
- //错误处理
- mqtt_error_process_callback(client, arg);
- }
- else
- {
- printf("bsp_mqtt_request_cb: sub SUCCESS!\n");
- }
- }
- /*!
- * @brief mqtt 订阅
- * 执行条件:连接成功
- *
- * @param [in1] : mqtt 连接句柄
- * @param [in2] : mqtt 发送 topic 指针
- * @param [in5] : qos
- * @retval: 订阅状态
- */
- static err_t bsp_mqtt_subscribe(mqtt_client_t* mqtt_client, char * sub_topic, uint8_t qos)
- {
- printf("bsp_mqtt_subscribe: Enter\n");
- if( ( mqtt_client == NULL) || ( sub_topic == NULL) || ( qos > 2 ) )
- {
- printf("bsp_mqtt_subscribe: input error@@\n");
- return ERR_VAL;
- }
- if ( mqtt_client_is_connected(mqtt_client) != pdTRUE )
- {
- printf("bsp_mqtt_subscribe: mqtt is not connected, return ERR_CLSD.\n");
- return ERR_CLSD;
- }
- err_t err;
- err = mqtt_subscribe(mqtt_client, sub_topic, qos, bsp_mqtt_request_cb, (void *)mqtt_client); // subscribe and call back.
- if (err != ERR_OK)
- {
- printf("bsp_mqtt_subscribe: mqtt_subscribe Fail, return:%s \n", lwip_strerr(err));
- }
- else
- {
- printf("bsp_mqtt_subscribe: mqtt_subscribe SUCCESS, reason: %s\n", lwip_strerr(err));
- }
- return err;
- }
- /* ===========================================
- 初始化接口函数
- ============================================== */
- /*!
- * @brief 封装 MQTT 初始化接口
- * 执行条件:无
- *
- * @retval: 无
- */
- void bsp_mqtt_init(void)
- {
- printf("Mqtt init...");
- // 连接服务器
- bsp_mqtt_connect();
- // 发送消息到服务器
- char message_test[] = "Hello mqtt server";
- for(int i = 0; i < 10; i++)
- {
- bsp_mqtt_publish(s__mqtt_client_instance,"/public/TEST/AidenHinGwenWong_pub",message_test,sizeof(message_test),1,0);
- vTaskDelay(1000);
- }
- }
复制代码 【代码分析】:
整个代码不长,不到500行,在void bsp_mqtt_init(void)函数中,调用bsp_mqtt_connect函数。在bsp_mqtt_connect中,首先创建一个连接mqtt的结构体 mqtt_connect_client_info_t,在mqtt.h中的定义如下:
- * Client information and connection parameters */
- struct mqtt_connect_client_info_t {
- /** Clinet ID */
- const char *client_id;
- /** 如果服务器需要验证用户,则填写用户名, 如果不需要测为 NULL */
- const char* client_user;
- /** 如果服务器需要验证用户,则填写密码, 如果不需要测为 NULL */
- const char* client_pass;
- /** <span style="background-color: rgb(255, 255, 255); padding-left: 2px;"><span style="color: rgb(0, 0, 0); font-family: "Courier New"; font-size: 10pt; white-space: pre;"><span style="color:#3f7f5f;">与 MQTT 服务器保持连接时间,时间超过未发送数据会断开</span></span></span>, 0 禁用保持活动功能*/
- u16_t keep_alive;
- /** w<span style="background-color: rgb(255, 255, 255); padding-left: 2px;"><span style="color: rgb(0, 0, 0); font-family: "Courier New"; font-size: 10pt; white-space: pre;"><span style="color:#3f7f5f;"> MQTT遗嘱的消息发送topic</span></span></span>, set to NULL if will is not to be used,
- */
- const char* will_topic;
- /** will_msg, see will_topic */
- const char* will_msg;
- /** will_qos, see will_topic */
- u8_t will_qos;
- /** will_retain, see will_topic */
- u8_t will_retain;
- #if LWIP_ALTCP && LWIP_ALTCP_TLS
- /** TLS configuration for secure connections */
- struct altcp_tls_config *tls_config;
- #endif
- };
复制代码 我在上面注释了。接下来的使用ip4_addr_set_u32来设置服务器的IP地址。同时指定mqtt的服务端口。
接下来使用mqtt_client_connect来连接服务器,并注册连接中的回调函数。从而开始mqtt的服务。
接连功成功后,注册了bsp_mqtt_incoming_publish_cb接收回调函数、以及订阅成功的函数mqtt_conn_suc_proc。同时也注册了错误处理的回调函数。这样就不需要再创建线程去调用了。直接使用lwip的线程来处理报文。
最后是接收的数据回调做了简单打印功能。
到此mqtt的代码编写结事,我们把bsp_mqtt_init放到LWIPINIt后面执行,就行了。
- /* USER CODE END Header_StartDefaultTask */
- void StartDefaultTask(void *argument)
- {
- /* init code for LWIP */
- MX_LWIP_Init();
- /* USER CODE BEGIN StartDefaultTask */
- // TCP_Client_Init();
- // TCP_Echo_Init();
- bsp_mqtt_init();
- /* Infinite loop */
- for(;;)
- {
- osDelay(1);
- }
- /* USER CODE END StartDefaultTask */
- }
复制代码 【实验现象】
我们在本地创建了mqtt服务器,端口为1883。启动器后,开发板可以顺利的连接到服务回:
同时我们开启一个mqtt客户端,并打开串口调试助手查看信息:
串口助手中查看到我们开发板接收到的信息:
这样就成功的实现了MQTT的移植,并实现了其功能。
【注意】
在刚刚移植好后,会在连一会后报错。
Assertion “sys_timeout: timeout != NULL, pool MEMP_SYS_TIMEOUT is empty” failed at line 216 in src/core/timers.c然后就需要重启好几次才能连上,后面找到资料,需要修改一下参数:
这样修改后,就不会报错了。
【总结】
这个移植,相比于野火的mqtt移植要简单许多,使用tcpclien、tcpserver都要比他简单。
【感谢】
在移植过程中学习到了两篇文章在此感谢:
《【嵌入式实战】一文拿下 STM32 Lwip MQTT(超详细)》作者:HinGwenWoong
《STM32+LWIP协议栈实现MQTT协议并挂载到EMQ_X_CLOUD平台》作者:爱小羊
|
|