一 前言
继续写,这篇文章实现了桌面应用的MQTT通信,以及我添加了一个界面,实现json数据的解析,这步主要时为了方便我们的调试和学习一下python的json数据解析功能,结果还是符合预期
本篇效果如下
上篇连接
Python -- PyQt6+paho.mqtt 制作的MQTT桌面收发器(阿里云示范)https://blog.csdn.net/herui_2/article/details/144512439?spm=1001.2014.3001.5501
二 环境安装
前面我们以及实现了桌面开发和mqtt库函数的使用,这边我们就直接引用一下
1. 编译器
可以查看这篇文章
Python -- PyQt6 制作简易的桌面应用(安装-入门)https://herui.blog.csdn.net/article/details/144501509?spm=1001.2014.3001.5502
2. 环境库下载
可以查看这篇文章
Python -- paho.mqtt 库制作简易的MQTT通信(阿里云)https://herui.blog.csdn.net/article/details/144508263?spm=1001.2014.3001.5502
三 代码编写
首先我们需要在阿里云里面建立我们的产品和设备,并且获取到相关的mqtt连接参数
可以参考这个文章里面的云平台部分
ESP32 -- 使用MQTT协议连接云平台(带图文说明)https://herui.blog.csdn.net/article/details/135317019?spm=1001.2014.3001.5502
获取到两个设备的mqtt参数就好了
1. Python部分
这个代码实现Mqtt连接,并且实现设备的参数的自定义,实现了设备的重连功能以及自定义发送的主题和内容,实现了按键发送
把我们平台上面获取的内容修改进去
也可以直接运行之后修改上面的内容
代码如下
import sys
import json
from PyQt6.QtWidgets import QApplication, QWidget, QVBoxLayout, QLineEdit, QLabel, QPushButton, QGridLayout, QTextEdit,
QCheckBox, QTabWidget
import paho.mqtt.client as mqtt
import threading
# MQTT 服务器设置(初始值)
MQTT_BROKER_IP = "iot-06z00axdhgfk24n.mqtt.iothub.aliyuncs.com"
MQTT_BROKER_PORT = 1883
MQTT_CLIENT_ID = "h9sjD6ci5EI.smartdevice|securemode=2,signmethod=hmacsha256,timestamp=1734329040945|"
MQTT_TOPIC_PUBLISH = "/broadcast/h9sjD6ci5EI/test1"
MQTT_TOPIC_SUBSCRIBE = "/broadcast/h9sjD6ci5EI/test2"
MQTT_USERNAME = "smartdevice&h9sjD6ci5EI"
MQTT_PASSWORD = "4d1a97eaee5c0c8bd5fdad2292f5a83239c2a21bcb280eb8bec8a28741549a9e"
# 全局变量来存储更新的 MQTT 参数
mqtt_params = {
"broker_ip": MQTT_BROKER_IP,
"broker_port": MQTT_BROKER_PORT,
"client_id": MQTT_CLIENT_ID,
"username": MQTT_USERNAME,
"password": MQTT_PASSWORD,
"publish_topic": MQTT_TOPIC_PUBLISH,
"subscribe_topic": MQTT_TOPIC_SUBSCRIBE,
}
# MQTT 回调函数:连接成功时触发
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
client.subscribe(mqtt_params["subscribe_topic"]) # 订阅消息主题
# MQTT 回调函数:接收到消息时触发
def on_message(client, userdata, msg):
print(f"Received message on topic {msg.topic}: {msg.payload.decode()}")
# 检查消息是否是自己发送的
if app_window.sent_message is not None and msg.payload.decode() == app_window.sent_message:
return # 如果是自己发送的消息,跳过处理
# 继续进行JSON解析和显示
try:
# 假设接收到的消息是 JSON 格式
data = json.loads(msg.payload.decode())
if "humi" in data and "tmep" in data:
humidity = data["humi"]
temperature = data["tmep"]
# 将解析后的数据传递给显示界面
app_window.update_data(humidity, temperature)
app_window.log(f"Received - Humidity: {humidity}, Temperature: {temperature}")
except json.JSONDecodeError:
print("Failed to decode JSON message.")
# MQTT 回调函数:断开连接时触发
def on_disconnect(client, userdata, rc):
print(f"Disconnected with result code {rc}")
class MQTTApp(QWidget):
def __init__(self):
super().__init__()
self.initUI()
self.client = mqtt.Client(mqtt_params["client_id"]) # 创建 MQTT 客户端
self.setup_mqtt() # 设置 MQTT 回调函数
self.connected = False # 跟踪连接状态
self.sent_message = None # 用于记录当前发送的消息,防止打印自己的消息
def initUI(self):
"""初始化界面"""
self.setWindowTitle('MQTT Client with PyQt6') # 设置窗口标题
self.setGeometry(400, 400, 600, 600) # 设置窗口大小和位置
# 创建选项卡(Tab)布局
self.tabs = QTabWidget()
self.tab1 = QWidget()
self.tab2 = QWidget()
# 第一个Tab界面:用于连接和发送数据
layout1 = QVBoxLayout()
grid = QGridLayout()
self.broker_ip_input = QLineEdit(mqtt_params["broker_ip"]) # 服务器地址输入框
self.broker_port_input = QLineEdit(str(mqtt_params["broker_port"])) # 服务器端口输入框
self.client_id_input = QLineEdit(mqtt_params["client_id"]) # 客户端ID输入框
self.username_input = QLineEdit(mqtt_params["username"]) # 用户名输入框
self.password_input = QLineEdit(mqtt_params["password"]) # 密码输入框
self.publish_topic_input = QLineEdit(mqtt_params["publish_topic"]) # 发布主题输入框
self.subscribe_topic_input = QLineEdit(mqtt_params["subscribe_topic"]) # 订阅主题输入框
grid.addWidget(QLabel('Broker IP:'), 0, 0)
grid.addWidget(self.broker_ip_input, 0, 1)
grid.addWidget(QLabel('Broker Port:'), 1, 0)
grid.addWidget(self.broker_port_input, 1, 1)
grid.addWidget(QLabel('Client ID:'), 2, 0)
grid.addWidget(self.client_id_input, 2, 1)
grid.addWidget(QLabel('Username:'), 3, 0)
grid.addWidget(self.username_input, 3, 1)
grid.addWidget(QLabel('Password:'), 4, 0)
grid.addWidget(self.password_input, 4, 1)
grid.addWidget(QLabel('Publish Topic:'), 5, 0)
grid.addWidget(self.publish_topic_input, 5, 1)
grid.addWidget(QLabel('Subscribe Topic:'), 6, 0)
grid.addWidget(self.subscribe_topic_input, 6, 1)
layout1.addLayout(grid)
self.message_input = QLineEdit("Hello MQTT") # 消息输入框
layout1.addWidget(self.message_input)
self.connect_button = QPushButton('连接') # 连接按钮
self.connect_button.clicked.connect(self.connect_mqtt)
layout1.addWidget(self.connect_button)
self.disconnect_button = QPushButton('断开') # 断开按钮
self.disconnect_button.clicked.connect(self.disconnect_mqtt)
layout1.addWidget(self.disconnect_button)
self.send_button = QPushButton('发送数据') # 发送按钮
self.send_button.clicked.connect(self.send_message)
self.send_button.setEnabled(False)
layout1.addWidget(self.send_button)
self.log_display = QTextEdit()
self.log_display.setReadOnly(True) # 设置为只读
layout1.addWidget(self.log_display)
self.auto_wrap_checkbox = QCheckBox('Auto-Wrap')
self.auto_wrap_checkbox.setChecked(True)
layout1.addWidget(self.auto_wrap_checkbox)
self.tab1.setLayout(layout1)
# 第二个Tab界面:用于显示解析后的消息
layout2 = QVBoxLayout() # 主布局
# 创建一个 QGridLayout,用于放置湿度和温度标签
grid2 = QGridLayout()
# 使用 QLineEdit 显示湿度和温度,并设置为只读
self.humidity_label = QLineEdit("xx")
self.humidity_label.setReadOnly(True) # 设置 QLineEdit 为只读
self.humidity_label.setStyleSheet("font-size: 18px; font-weight: bold;") # 设置标题样式
self.temperature_label = QLineEdit("xx")
self.temperature_label.setReadOnly(True) # 设置 QLineEdit 为只读
self.temperature_label.setStyleSheet("font-size: 18px; font-weight: bold;") # 设置标题样式
# 添加控件到 grid 布局
grid2.addWidget(QLabel("Humidity:"), 1, 0)
grid2.addWidget(self.humidity_label, 1, 1)
grid2.addWidget(QLabel("Temperature:"), 2, 0)
grid2.addWidget(self.temperature_label, 2, 1)
# 将 grid2 布局添加到 layout2 中
layout2.addLayout(grid2)
# 设置 tab2 的布局
self.tab2.setLayout(layout2)
# 将两个Tab添加到Tab控件
self.tabs.addTab(self.tab1, "MQTT参数设置")
self.tabs.addTab(self.tab2, "MQTT消息解析")
main_layout = QVBoxLayout()
main_layout.addWidget(self.tabs)
self.setLayout(main_layout)
def setup_mqtt(self):
"""设置 MQTT 客户端的回调函数"""
self.client.on_connect = on_connect
self.client.on_message = on_message
self.client.on_disconnect = on_disconnect
def update_data(self, humidity, temperature):
"""更新第二个界面上的湿度和温度数据"""
self.humidity_label.setText(f"{humidity}")
self.temperature_label.setText(f"{temperature}")
def update_mqtt_params(self):
"""更新 MQTT 参数"""
mqtt_params["broker_ip"] = self.broker_ip_input.text()
mqtt_params["broker_port"] = int(self.broker_port_input.text())
mqtt_params["client_id"] = self.client_id_input.text()
mqtt_params["username"] = self.username_input.text()
mqtt_params["password"] = self.password_input.text()
mqtt_params["publish_topic"] = self.publish_topic_input.text()
mqtt_params["subscribe_topic"] = self.subscribe_topic_input.text()
# 打印当前参数,以确认是否正确
print(f"Broker IP: {mqtt_params['broker_ip']}")
print(f"Broker Port: {mqtt_params['broker_port']}")
print(f"Client ID: {mqtt_params['client_id']}")
print(f"Username: {mqtt_params['username']}")
print(f"Password: {mqtt_params['password']}")
print(f"Publish Topic: {mqtt_params['publish_topic']}")
print(f"Subscribe Topic: {mqtt_params['subscribe_topic']}")
def connect_mqtt(self):
"""连接到 MQTT 服务器"""
if not self.connected:
self.update_mqtt_params() # 更新 MQTT 参数
self.client = mqtt.Client(mqtt_params["client_id"]) # 创建 MQTT 客户端
self.setup_mqtt() # 设置 MQTT 回调函数
self.client.username_pw_set(mqtt_params['username'],mqtt_params['password'])
try:
print(f"Connecting to {mqtt_params['broker_ip']}:{mqtt_params['broker_port']}")
self.client.connect(mqtt_params['broker_ip'], int(mqtt_params['broker_port']), 60)
self.client_thread = threading.Thread(target=self.client.loop_forever)
self.client_thread.daemon = True
self.client_thread.start()
self.connected = True
self.connect_button.setEnabled(False)
self.disconnect_button.setEnabled(True)
self.send_button.setEnabled(True)
self.log("Connected to MQTT broker.")
except Exception as e:
self.log(f"Failed to connect: {e}")
print(f"Failed to connect: {e}")
def disconnect_mqtt(self):
"""断开 MQTT 连接"""
if self.connected:
self.client.loop_stop()
self.client.disconnect()
self.connected = False
self.connect_button.setEnabled(True)
self.disconnect_button.setEnabled(False)
self.send_button.setEnabled(False)
self.log("Disconnected from MQTT broker.")
def send_message(self):
"""发送消息"""
if self.connected:
message = self.message_input.text()
self.sent_message = message # 记录发送的消息
qos = 2
self.client.publish(mqtt_params["publish_topic"], message.encode(), qos=qos)
print(f"Sent message: {message} with QoS {qos} to topic {mqtt_params['publish_topic']}")
self.log(f"Sent: {message}")
def log(self, message):
"""更新日志显示框"""
if self.sent_message is None or message != f"Sent: {self.sent_message}":
self.log_display.append(message) # 仅显示接收的数据
def closeEvent(self, event):
"""关闭窗口时断开 MQTT 连接"""
self.disconnect_mqtt()
event.accept()
def main():
"""主函数,启动应用程序"""
global app_window
app = QApplication(sys.argv)
app_window = MQTTApp()
app_window.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()
2. MQTTX部分
MQTTX:全功能 MQTT 客户端工具MQTTX 是一款强大的全功能 MQTT 5.0 客户端工具,适用于桌面、命令行和 WebSocket。它使得开发和测试 MQTT 应用更加简单高效。https://mqttx.app/zhhttps://mqttx.app/zhhttps://mqttx.app/zh
连接mqtt
打开软件添加对应的MQTT信息,点击连接即可
订阅主题
填写对应的python发布的Mqtt主题消息的名称,进行连接即可
发布主题
四 效果
点击发送就可以实现两个部分的相互通信了
实现Json数据的接收 解析 展示
云平台
五 结束
到目前为止,Python实现了MQTT通信,并且成功解析和显示JSON数据。后续的工作主要是根据自己的需求调整界面效果,进一步优化和完善功能。整个过程我只是进行了简单的接触和实验,未来可以根据实际需求继续扩展和定制。
联系方式 微信号:13648103287