pyqt写一个TCP(UDP)检测工具

发布于:2025-05-07 ⋅ 阅读:(17) ⋅ 点赞:(0)

         先用电脑连接到目标WIFI,再运行以下代码。

import sys
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5.QtNetwork import *


class NetTestTool(QWidget):
    def __init__(self):
        super().__init__()
        self.init_ui()
        self.tcp_socket = QTcpSocket()
        self.udp_socket = QUdpSocket()
        self.udp_socket.readyRead.connect(self.udp_receive)

        self.add_connection_check()

    def add_connection_check(self):
        # 新增连接检测组件
        check_btn = QPushButton("检测连接", clicked=self.check_connection)
        self.status_light = QLabel("◌")
        self.layout().insertWidget(1, check_btn)
        self.layout().insertWidget(2, self.status_light)

        # TCP连接信号绑定
        self.tcp_socket.connected.connect(lambda: self.update_status(True))
        self.tcp_socket.errorOccurred.connect(lambda: self.update_status(False))

    def check_connection(self):
        ip = self.ip_input.text()
        port = int(self.port_input.text())

        if self.protocol.currentText() == "TCP":
            self.tcp_socket.abort()  # 终止旧连接
            self.tcp_socket.connectToHost(ip, port, QIODevice.ReadWrite)
            # 设置5秒超时
            QTimer.singleShot(5000, lambda: self.update_status(False)
            if self.tcp_socket.state() == QAbstractSocket.ConnectingState
            else None)
        else:  # UDP检测
            self.udp_socket.writeDatagram(b"PING", QHostAddress(ip), port)
            QTimer.singleShot(3000, lambda: self.update_status(False)
            if "[UDP响应]" not in self.recv_text.toPlainText()
            else None)

    def update_status(self, is_connected):
        color = "green" if is_connected else "red"
        text = "连接正常" if is_connected else "连接失败"
        self.status_light.setStyleSheet(f"color:{color}; font-size:24px;")
        self.status_light.setText("●" if is_connected else "◌")
        self.status.showMessage(text)

    def udp_receive(self,data):
        # 原有代码...

        if data == "PONG":  # 设备需返回PONG响应
            self.recv_text.append(f"[UDP响应] {data}")
            self.update_status(True)


    def init_ui(self):
        self.setWindowTitle("网络调试工具")
        layout = QVBoxLayout()

        # 协议选择
        self.protocol = QComboBox()
        self.protocol.addItems(["TCP", "UDP"])

        # 地址端口输入
        ip_layout = QHBoxLayout()
        self.ip_input = QLineEdit("192.168.4.2")
        self.port_input = QLineEdit("8080")
        ip_layout.addWidget(QLabel("地址:"))
        ip_layout.addWidget(self.ip_input)
        ip_layout.addWidget(QLabel("端口:"))
        ip_layout.addWidget(self.port_input)

        # 数据发送区域
        send_layout = QHBoxLayout()
        self.send_text = QTextEdit()
        self.send_btn = QPushButton("发送")
        self.send_btn.clicked.connect(self.send_data)
        send_layout.addWidget(self.send_text)
        send_layout.addWidget(self.send_btn)

        # 接收显示区域
        self.recv_text = QTextBrowser()
        self.clear_btn = QPushButton("清空")
        self.clear_btn.clicked.connect(self.recv_text.clear)

        # 状态栏
        self.status = QStatusBar()

        # 布局组装
        layout.addWidget(self.protocol)
        layout.addLayout(ip_layout)
        layout.addLayout(send_layout)
        layout.addWidget(QLabel("接收数据:"))
        layout.addWidget(self.recv_text)
        layout.addWidget(self.clear_btn)
        layout.addWidget(self.status)
        self.setLayout(layout)

    def send_data(self):
        protocol = self.protocol.currentText()
        data = self.send_text.toPlainText().encode()
        ip = self.ip_input.text()
        port = int(self.port_input.text())

        try:
            if protocol == "TCP":
                self.tcp_socket.connectToHost(ip, port)
                self.tcp_socket.write(data)
                self.tcp_socket.readyRead.connect(self.tcp_receive)
            else:
                self.udp_socket.writeDatagram(data, QHostAddress(ip), port)
            self.status.showMessage("发送成功")
        except Exception as e:
            self.status.showMessage(f"错误: {str(e)}")

    def tcp_receive(self):
        data = self.tcp_socket.readAll().data().decode()
        self.recv_text.append(f"[TCP接收] {data}")

    def udp_receive(self):
        while self.udp_socket.hasPendingDatagrams():
            datagram = self.udp_socket.receiveDatagram()
            data = datagram.data().data().decode()
            self.recv_text.append(f"[UDP接收] {data}")


if __name__ == "__main__":
    app = QApplication(sys.argv)
    window = NetTestTool()
    window.resize(600, 400)
    window.show()
    sys.exit(app.exec_())

测试结果

        连接正常

         连接错误


网站公告

今日签到

点亮在社区的每一天
去签到