加入星计划,您可以享受以下权益:

  • 创作内容快速变现
  • 行业影响力扩散
  • 作品版权保护
  • 300W+ 专业用户
  • 1.5W+ 优质创作者
  • 5000+ 长期合作伙伴
立即加入

Python-PyQt6制作的MQTT桌面应用解析Json数据并显示(阿里云)

6小时前
236
服务支持:
技术交流群

完成交易后在“购买成功”页面扫码入群,即可与技术大咖们分享疑惑和经验、收获成长和认同、领取优惠和红包等。

虚拟商品不可退

当前内容为数字版权作品,购买后不支持退换且无法转移使用。

加入交流群
扫码加入
获取工程师必备礼包
参与热点资讯讨论
放大
实物图
相关方案
  • 方案介绍
    • 一 前言
    •  二 环境安装
    •  三 代码编写
    • 四 效果
    • 五 结束
  • 相关文件
  • 相关推荐
  • 电子产业图谱
申请入驻 产业图谱

一 前言

继续写,这篇文章实现了桌面应用的MQTT通信,以及我添加了一个界面,实现json数据的解析,这步主要时为了方便我们的调试和学习一下python的json数据解析功能,结果还是符合预期

本篇效果如下

上篇连接

Python -- PyQt6+paho.mqtt 制作的MQTT桌面收发器(阿里云示范)icon-default.png?t=O83Ahttps://blog.csdn.net/herui_2/article/details/144512439?spm=1001.2014.3001.5501

 二 环境安装

前面我们以及实现了桌面开发和mqtt库函数的使用,这边我们就直接引用一下

1. 编译器

可以查看这篇文章

Python -- PyQt6 制作简易的桌面应用(安装-入门)icon-default.png?t=O83Ahttps://herui.blog.csdn.net/article/details/144501509?spm=1001.2014.3001.5502

2. 环境库下载

可以查看这篇文章

Python -- paho.mqtt 库制作简易的MQTT通信(阿里云)icon-default.png?t=O83Ahttps://herui.blog.csdn.net/article/details/144508263?spm=1001.2014.3001.5502

 三 代码编写

首先我们需要在阿里云里面建立我们的产品和设备,并且获取到相关的mqtt连接参数

可以参考这个文章里面的云平台部分

ESP32 -- 使用MQTT协议连接云平台(带图文说明)https://herui.blog.csdn.net/article/details/135317019?spm=1001.2014.3001.5502

获取到两个设备的mqtt参数就好了

​​

1. Python部分

这个代码实现Mqtt连接,并且实现设备的参数的自定义,实现了设备的重连功能以及自定义发送的主题和内容,实现了按键发送

把我们平台上面获取的内容修改进去

​​

 也可以直接运行之后修改上面的内容

代码如下

import sys
import json
from PyQt6.QtWidgets import QApplication, QWidget, QVBoxLayout, QLineEdit, QLabel, QPushButton, QGridLayout, QTextEdit, 
    QCheckBox, QTabWidget
import paho.mqtt.client as mqtt
import threading

# MQTT 服务器设置(初始值)
MQTT_BROKER_IP = "iot-06z00axdhgfk24n.mqtt.iothub.aliyuncs.com"
MQTT_BROKER_PORT = 1883
MQTT_CLIENT_ID = "h9sjD6ci5EI.smartdevice|securemode=2,signmethod=hmacsha256,timestamp=1734329040945|"
MQTT_TOPIC_PUBLISH = "/broadcast/h9sjD6ci5EI/test1"
MQTT_TOPIC_SUBSCRIBE = "/broadcast/h9sjD6ci5EI/test2"
MQTT_USERNAME = "smartdevice&h9sjD6ci5EI"
MQTT_PASSWORD = "4d1a97eaee5c0c8bd5fdad2292f5a83239c2a21bcb280eb8bec8a28741549a9e"

# 全局变量来存储更新的 MQTT 参数
mqtt_params = {
    "broker_ip": MQTT_BROKER_IP,
    "broker_port": MQTT_BROKER_PORT,
    "client_id": MQTT_CLIENT_ID,
    "username": MQTT_USERNAME,
    "password": MQTT_PASSWORD,
    "publish_topic": MQTT_TOPIC_PUBLISH,
    "subscribe_topic": MQTT_TOPIC_SUBSCRIBE,
}


# MQTT 回调函数:连接成功时触发
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    client.subscribe(mqtt_params["subscribe_topic"])  # 订阅消息主题


# MQTT 回调函数:接收到消息时触发
def on_message(client, userdata, msg):
    print(f"Received message on topic {msg.topic}: {msg.payload.decode()}")

    # 检查消息是否是自己发送的
    if app_window.sent_message is not None and msg.payload.decode() == app_window.sent_message:
        return  # 如果是自己发送的消息,跳过处理

    # 继续进行JSON解析和显示
    try:
        # 假设接收到的消息是 JSON 格式
        data = json.loads(msg.payload.decode())
        if "humi" in data and "tmep" in data:
            humidity = data["humi"]
            temperature = data["tmep"]
            # 将解析后的数据传递给显示界面
            app_window.update_data(humidity, temperature)
            app_window.log(f"Received - Humidity: {humidity}, Temperature: {temperature}")
    except json.JSONDecodeError:
        print("Failed to decode JSON message.")


# MQTT 回调函数:断开连接时触发
def on_disconnect(client, userdata, rc):
    print(f"Disconnected with result code {rc}")


class MQTTApp(QWidget):
    def __init__(self):
        super().__init__()
        self.initUI()
        self.client = mqtt.Client(mqtt_params["client_id"])  # 创建 MQTT 客户端
        self.setup_mqtt()  # 设置 MQTT 回调函数
        self.connected = False  # 跟踪连接状态
        self.sent_message = None  # 用于记录当前发送的消息,防止打印自己的消息

    def initUI(self):
        """初始化界面"""
        self.setWindowTitle('MQTT Client with PyQt6')  # 设置窗口标题
        self.setGeometry(400, 400, 600, 600)  # 设置窗口大小和位置

        # 创建选项卡(Tab)布局
        self.tabs = QTabWidget()
        self.tab1 = QWidget()
        self.tab2 = QWidget()

        # 第一个Tab界面:用于连接和发送数据
        layout1 = QVBoxLayout()
        grid = QGridLayout()
        self.broker_ip_input = QLineEdit(mqtt_params["broker_ip"])  # 服务器地址输入框
        self.broker_port_input = QLineEdit(str(mqtt_params["broker_port"]))  # 服务器端口输入框
        self.client_id_input = QLineEdit(mqtt_params["client_id"])  # 客户端ID输入框
        self.username_input = QLineEdit(mqtt_params["username"])  # 用户名输入框
        self.password_input = QLineEdit(mqtt_params["password"])  # 密码输入框
        self.publish_topic_input = QLineEdit(mqtt_params["publish_topic"])  # 发布主题输入框
        self.subscribe_topic_input = QLineEdit(mqtt_params["subscribe_topic"])  # 订阅主题输入框

        grid.addWidget(QLabel('Broker IP:'), 0, 0)
        grid.addWidget(self.broker_ip_input, 0, 1)
        grid.addWidget(QLabel('Broker Port:'), 1, 0)
        grid.addWidget(self.broker_port_input, 1, 1)
        grid.addWidget(QLabel('Client ID:'), 2, 0)
        grid.addWidget(self.client_id_input, 2, 1)
        grid.addWidget(QLabel('Username:'), 3, 0)
        grid.addWidget(self.username_input, 3, 1)
        grid.addWidget(QLabel('Password:'), 4, 0)
        grid.addWidget(self.password_input, 4, 1)
        grid.addWidget(QLabel('Publish Topic:'), 5, 0)
        grid.addWidget(self.publish_topic_input, 5, 1)
        grid.addWidget(QLabel('Subscribe Topic:'), 6, 0)
        grid.addWidget(self.subscribe_topic_input, 6, 1)
        layout1.addLayout(grid)

        self.message_input = QLineEdit("Hello MQTT")  # 消息输入框
        layout1.addWidget(self.message_input)

        self.connect_button = QPushButton('连接')  # 连接按钮
        self.connect_button.clicked.connect(self.connect_mqtt)
        layout1.addWidget(self.connect_button)

        self.disconnect_button = QPushButton('断开')  # 断开按钮
        self.disconnect_button.clicked.connect(self.disconnect_mqtt)
        layout1.addWidget(self.disconnect_button)

        self.send_button = QPushButton('发送数据')  # 发送按钮
        self.send_button.clicked.connect(self.send_message)
        self.send_button.setEnabled(False)
        layout1.addWidget(self.send_button)

        self.log_display = QTextEdit()
        self.log_display.setReadOnly(True)  # 设置为只读
        layout1.addWidget(self.log_display)

        self.auto_wrap_checkbox = QCheckBox('Auto-Wrap')
        self.auto_wrap_checkbox.setChecked(True)
        layout1.addWidget(self.auto_wrap_checkbox)

        self.tab1.setLayout(layout1)
        # 第二个Tab界面:用于显示解析后的消息
        layout2 = QVBoxLayout()  # 主布局

        # 创建一个 QGridLayout,用于放置湿度和温度标签
        grid2 = QGridLayout()

        # 使用 QLineEdit 显示湿度和温度,并设置为只读
        self.humidity_label = QLineEdit("xx")
        self.humidity_label.setReadOnly(True)  # 设置 QLineEdit 为只读
        self.humidity_label.setStyleSheet("font-size: 18px; font-weight: bold;")  # 设置标题样式

        self.temperature_label = QLineEdit("xx")
        self.temperature_label.setReadOnly(True)  # 设置 QLineEdit 为只读
        self.temperature_label.setStyleSheet("font-size: 18px; font-weight: bold;")  # 设置标题样式

        # 添加控件到 grid 布局
        grid2.addWidget(QLabel("Humidity:"), 1, 0)
        grid2.addWidget(self.humidity_label, 1, 1)
        grid2.addWidget(QLabel("Temperature:"), 2, 0)
        grid2.addWidget(self.temperature_label, 2, 1)

        # 将 grid2 布局添加到 layout2 中
        layout2.addLayout(grid2)

        # 设置 tab2 的布局
        self.tab2.setLayout(layout2)

        # 将两个Tab添加到Tab控件
        self.tabs.addTab(self.tab1, "MQTT参数设置")
        self.tabs.addTab(self.tab2, "MQTT消息解析")

        main_layout = QVBoxLayout()
        main_layout.addWidget(self.tabs)
        self.setLayout(main_layout)

    def setup_mqtt(self):
        """设置 MQTT 客户端的回调函数"""
        self.client.on_connect = on_connect
        self.client.on_message = on_message
        self.client.on_disconnect = on_disconnect

    def update_data(self, humidity, temperature):
        """更新第二个界面上的湿度和温度数据"""
        self.humidity_label.setText(f"{humidity}")
        self.temperature_label.setText(f"{temperature}")

    def update_mqtt_params(self):

        """更新 MQTT 参数"""
        mqtt_params["broker_ip"] = self.broker_ip_input.text()
        mqtt_params["broker_port"] = int(self.broker_port_input.text())
        mqtt_params["client_id"] = self.client_id_input.text()
        mqtt_params["username"] = self.username_input.text()
        mqtt_params["password"] = self.password_input.text()
        mqtt_params["publish_topic"] = self.publish_topic_input.text()
        mqtt_params["subscribe_topic"] = self.subscribe_topic_input.text()

        # 打印当前参数,以确认是否正确
        print(f"Broker IP: {mqtt_params['broker_ip']}")
        print(f"Broker Port: {mqtt_params['broker_port']}")
        print(f"Client ID: {mqtt_params['client_id']}")
        print(f"Username: {mqtt_params['username']}")
        print(f"Password: {mqtt_params['password']}")
        print(f"Publish Topic: {mqtt_params['publish_topic']}")
        print(f"Subscribe Topic: {mqtt_params['subscribe_topic']}")

    def connect_mqtt(self):
        """连接到 MQTT 服务器"""
        if not self.connected:
            self.update_mqtt_params()  # 更新 MQTT 参数
            self.client = mqtt.Client(mqtt_params["client_id"])  # 创建 MQTT 客户端
            self.setup_mqtt()  # 设置 MQTT 回调函数
            self.client.username_pw_set(mqtt_params['username'],mqtt_params['password'])

            try:
                print(f"Connecting to {mqtt_params['broker_ip']}:{mqtt_params['broker_port']}")
                self.client.connect(mqtt_params['broker_ip'], int(mqtt_params['broker_port']), 60)
                self.client_thread = threading.Thread(target=self.client.loop_forever)
                self.client_thread.daemon = True
                self.client_thread.start()
                self.connected = True
                self.connect_button.setEnabled(False)
                self.disconnect_button.setEnabled(True)
                self.send_button.setEnabled(True)
                self.log("Connected to MQTT broker.")
            except Exception as e:
                self.log(f"Failed to connect: {e}")
                print(f"Failed to connect: {e}")

    def disconnect_mqtt(self):
        """断开 MQTT 连接"""
        if self.connected:
            self.client.loop_stop()
            self.client.disconnect()
            self.connected = False
            self.connect_button.setEnabled(True)
            self.disconnect_button.setEnabled(False)
            self.send_button.setEnabled(False)
            self.log("Disconnected from MQTT broker.")

    def send_message(self):
        """发送消息"""
        if self.connected:
            message = self.message_input.text()
            self.sent_message = message  # 记录发送的消息
            qos = 2
            self.client.publish(mqtt_params["publish_topic"], message.encode(), qos=qos)
            print(f"Sent message: {message} with QoS {qos} to topic {mqtt_params['publish_topic']}")
            self.log(f"Sent: {message}")

    def log(self, message):
        """更新日志显示框"""
        if self.sent_message is None or message != f"Sent: {self.sent_message}":
            self.log_display.append(message)  # 仅显示接收的数据

    def closeEvent(self, event):
        """关闭窗口时断开 MQTT 连接"""
        self.disconnect_mqtt()
        event.accept()


def main():
    """主函数,启动应用程序"""
    global app_window
    app = QApplication(sys.argv)
    app_window = MQTTApp()
    app_window.show()
    sys.exit(app.exec())


if __name__ == "__main__":
    main()

 2. MQTTX部分

需要安装MQTTX软件,是由EMQX公司提供的,下载安装即可

MQTTX:全功能 MQTT 客户端工具MQTTX 是一款强大的全功能 MQTT 5.0 客户端工具,适用于桌面、命令行和 WebSocket。它使得开发和测试 MQTT 应用更加简单高效。https://mqttx.app/zhhttps://mqttx.app/zhicon-default.png?t=O83Ahttps://mqttx.app/zh

连接mqtt

打开软件添加对应的MQTT信息,点击连接即可

订阅主题

填写对应的python发布的Mqtt主题消息的名称,进行连接即可

发布主题

四 效果

点击发送就可以实现两个部分的相互通信了

实现Json数据的接收 解析 展示

 云平台

五 结束

到目前为止,Python实现了MQTT通信,并且成功解析和显示JSON数据。后续的工作主要是根据自己的需求调整界面效果,进一步优化和完善功能。整个过程我只是进行了简单的接触和实验,未来可以根据实际需求继续扩展和定制。


联系方式 微信号:13648103287

  • 联系方式.docx

相关推荐

电子产业图谱

方案定制,程序设计方案、单片机程序设计与讲解、APP定制开发。本公众号致力于向读者传递关于程序设计和开发的相关知识,并分享一些关于软件开发的最佳实践。如果您有什么问题或建议,请随时联系我们。我们将竭诚为您服务