当前位置: 首页 > news >正文

Python的子线程与主线程之间的通信并通知主线程更新UI

新建PLC类 PLC.py

import json
import time
from threading import Thread

from HslCommunication import SiemensS7Net, SiemensPLCS
from PySide6.QtCore import QThread, Signal, QObject

from tdm.MsgType import MSG_TYPE_LOG, MSG_TYPE_MSGBOX


# 自定义信号类,用于与主线程进行交互
class WorkSingle(QObject):
    msg = Signal(str)


class PLC(QThread):
    def __init__(self, address, port):
        super().__init__()
        self.siemens = None  # 西门子对象
        self.connected_plc = None  # 是否连接上了PLC
        self.signal = WorkSingle() # 信道,与主线程通信
        self.address = address # PLC IP地址
        self.port = port # PLC 连接端口

    # 发送日志消息
    def send_log_msg(self, msg):
        data = {
            "type": MSG_TYPE_LOG,
            "msg": msg,
        }
        self.signal.msg.emit(json.dumps(data))

    # 发送弹窗类消息
    def send_dialog_msg(self, msg):
        data = {
            "type": MSG_TYPE_MSGBOX,
            "msg": msg,
        }
        self.signal.msg.emit(json.dumps(data))

    def init_plc_connect(self):
        self.siemens = SiemensS7Net(SiemensPLCS.S1200, self.address)
        self.siemens.port = int(self.port)
        connect = self.siemens.ConnectServer()
        if not connect.IsSuccess:
            self.connected_plc = False
            # print('初始化 PLC 连接失败: ' + connect.ToMessageShowString())
            self.send_log_msg('初始化 PLC 连接失败: ' + connect.ToMessageShowString())
        else:
            self.connected_plc = True
            print("初始化 PLC 连接成功!")
            self.send_log_msg("初始化 PLC 连接成功!")

    def run(self):
        self.init_plc_connect()  # 初始化PLC连接
        if not self.connected_plc:
            print("PLC连接失败,不能连续读取")
            self.send_log_msg("PLC连接失败,不能连续读取")
            return
        print("TODO 开始连续读取PLC值")
        self.send_dialog_msg("AAAAA")
        Thread(target=self.read_plc_value).start()
        print("即将执行耗时操作,读取完毕1")
        self.send_log_msg("即将执行耗时操作,开启新的线程去执行")
        self.send_dialog_msg("CCCCC")

    def read_plc_value(self):
        time.sleep(10)
        print(self.port)
        self.send_dialog_msg("BBBBBBBB")
        print("耗时操作,读取完毕")
        self.send_log_msg("耗时操作,读取完毕")

创建 TDMSocketClient.py

import asyncio
import json
import time
import traceback
from threading import Thread

import websockets
from PyQt6.QtCore import QThread
from PySide6.QtCore import QObject, Signal

from tdm.MsgType import MSG_TYPE_LOG


# 自定义信号类,用于与主线程进行交互
class WorkSingle(QObject):
    msg = Signal(str)


class TDMSocketClient(QThread):
    def __init__(self, ip, port):
        QThread.__init__(self)
        self.ip = ip
        self.port = port
        self.signal = WorkSingle()

    def run(self):
        # 启动 WebSocket 客户端连接服务器,用于接受服务器发来的试验通知
        Thread(target=self.runWSClient).start()

    def send_log_msg(self, msg):
        data = {
            "type": MSG_TYPE_LOG,
            "msg": msg,
        }
        self.signal.msg.emit(json.dumps(data))

    def runWSClient(self):
        # 下面的语句替换上面这一句,就不会报错啦,成功率高
        new_loop = asyncio.new_event_loop()
        asyncio.set_event_loop(new_loop)
        loop = asyncio.get_event_loop()
        task = asyncio.ensure_future(self.ws_client())
        loop.run_until_complete(asyncio.wait([task]))
        st = task.result()

    async def ws_client(self):
        try:
            uri = 'ws://' + self.ip + ':' + self.port + '/tdm/websocket/JZ076'
            async with websockets.connect(uri) as websocket:
                # localClient.setSuccessStatus("websocket 连接成功!")
                print('Connected to ' + self.ip + ':' + self.port + "success")
                self.send_log_msg('Connected to ' + self.ip + ':' + self.port + "success")
                while True:
                    result = await websocket.recv()
                    msg = json.loads(result)
                    try:
                        if msg["cmd"] == "startTest":
                            print("开始试验")
                            # 开始试验
                            # localClient.startTest(msg['testInsId'])
                        elif msg["cmd"] == "pauseTest":
                            # 暂停试验
                            # localClient.testPause()
                            print("暂停试验")
                        elif msg["cmd"] == "stopTest":
                            print("停止试验")
                            # 停止试验
                            # localClient.testStop()
                    except:
                        # 报错的话自动重连
                        traceback.print_exc()

        except:
            # 报错的话自动重连
            traceback.print_exc()
            time.sleep(1)
            await self.ws_client()

主类 TDMClient.py

'''
TDM 客户端
'''
import json
from typing import Final

from PyQt6.QtWidgets import QMessageBox
from PySide6.QtCore import QFile
from PySide6.QtUiTools import QUiLoader
from PySide6.QtWidgets import QApplication

from tdm.MsgType import MSG_TYPE_LOG, MSG_TYPE_MSGBOX
from tdm.PLC import PLC
from tdm.TDMSocketClient import TDMSocketClient


PLC_ADDRESS: Final = "192.168.1.83"
PLC_PORT: Final = "102"

WS_SERVICE_ADDRESS: Final = "localhost"
WS_SERVICE_PORT: Final = "8080"

# 显示弹窗
def show_msg_box(msg):
    mb = QMessageBox()
    mb.setText(msg)
    mb.exec()


class TDMClient:
    def __init__(self):
        # 获取设计文件
        qf = QFile('ui/TDMLocalClient.ui')
        qf.open(QFile.ReadOnly)
        qf.close()
        # 将设计文件加载为窗口
        self.ui = QUiLoader().load(qf)

        # PLC 读取相关
        self.plc = PLC(PLC_ADDRESS, PLC_PORT)
        self.plc.signal.msg.connect(self.receive_signal)

        self.ws = TDMSocketClient(WS_SERVICE_ADDRESS, WS_SERVICE_PORT)
        self.ws.signal.msg.connect(self.receive_signal)

    def run(self):
        self.ui.show()  # 显示客户端
        self.plc.run()  # 获取试验器的点位列表,并连续读取
        self.ws.run()  # 启动 WebSocket 客户端服务,用于接收服务器发来的 试验开始、试验暂停、试验停止等信息

    # 收到子线程信号是触发(唯一方法入口)
    def receive_signal(self, msg):
        msg_obj = json.loads(msg)
        if msg_obj['type'] == MSG_TYPE_LOG:
            self.ui.retView.appendPlainText(str(msg_obj['msg']))
        elif msg_obj['type'] == MSG_TYPE_MSGBOX:
            show_msg_box(str(msg_obj['msg']))


app = QApplication([])
tdmClient = TDMClient()
tdmClient.run()
app.exec()

相关文章:

  • LabVIEW齿轮箱故障分析系统
  • 基于WOA鲸鱼优化的BiLSTM双向长短期记忆网络序列预测算法matlab仿真,对比BiLSTM和LSTM
  • 一篇docker从入门到精通
  • deepseek本地部署,ragflow,docker
  • 【对话推荐系统】Towards Topic-Guided Conversational Recommender System 论文阅读
  • (五)趣学设计模式 之 建造者模式!
  • TileGenie_v1.3.0.1安装包
  • 【Transformer架构】
  • leetcode 119. 杨辉三角 II
  • 【EB-03】 AUTOSAR builder与EB RTE集成
  • 【框架】参考 Spring Security 安全框架设计出,轻量化高可扩展的身份认证与授权架构
  • MySQL要点总结二
  • LangChain大模型应用开发:构建Agent智能体
  • Ubuntu:wvp-GB28181-pro安装、运行
  • 单入单出队列性能优化(Lock-Free)
  • 异常处理在 Promptic 中怎么实现?
  • 基于Springboot医院预约挂号小程序系统【附源码】
  • 【工作流】Spring Boot 项目与 Camunda 的整合
  • Leecode刷题:LCR 076. 数组中的第 K 个最大元素
  • 笔试-最大利润
  • 网站开发视频教程百度云/免费b2b网站推广渠道
  • 网站怎么更改布局/宁波seo关键词费用
  • 网站新闻不添加关键词超链接对优化有影响吗/最新新闻事件
  • 网站开发可行性分析报告/seo单页快速排名
  • php做网站难吗/网站推广搜索
  • python 做网站/推广app拉人头赚钱