先用电脑连接到目标WIFI,再运行以下代码。
import sys
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5.QtNetwork import *
class NetTestTool(QWidget):
def __init__(self):
super().__init__()
self.init_ui()
self.tcp_socket = QTcpSocket()
self.udp_socket = QUdpSocket()
self.udp_socket.readyRead.connect(self.udp_receive)
self.add_connection_check()
def add_connection_check(self):
# 新增连接检测组件
check_btn = QPushButton("检测连接", clicked=self.check_connection)
self.status_light = QLabel("◌")
self.layout().insertWidget(1, check_btn)
self.layout().insertWidget(2, self.status_light)
# TCP连接信号绑定
self.tcp_socket.connected.connect(lambda: self.update_status(True))
self.tcp_socket.errorOccurred.connect(lambda: self.update_status(False))
def check_connection(self):
ip = self.ip_input.text()
port = int(self.port_input.text())
if self.protocol.currentText() == "TCP":
self.tcp_socket.abort() # 终止旧连接
self.tcp_socket.connectToHost(ip, port, QIODevice.ReadWrite)
# 设置5秒超时
QTimer.singleShot(5000, lambda: self.update_status(False)
if self.tcp_socket.state() == QAbstractSocket.ConnectingState
else None)
else: # UDP检测
self.udp_socket.writeDatagram(b"PING", QHostAddress(ip), port)
QTimer.singleShot(3000, lambda: self.update_status(False)
if "[UDP响应]" not in self.recv_text.toPlainText()
else None)
def update_status(self, is_connected):
color = "green" if is_connected else "red"
text = "连接正常" if is_connected else "连接失败"
self.status_light.setStyleSheet(f"color:{color}; font-size:24px;")
self.status_light.setText("●" if is_connected else "◌")
self.status.showMessage(text)
def udp_receive(self,data):
# 原有代码...
if data == "PONG": # 设备需返回PONG响应
self.recv_text.append(f"[UDP响应] {data}")
self.update_status(True)
def init_ui(self):
self.setWindowTitle("网络调试工具")
layout = QVBoxLayout()
# 协议选择
self.protocol = QComboBox()
self.protocol.addItems(["TCP", "UDP"])
# 地址端口输入
ip_layout = QHBoxLayout()
self.ip_input = QLineEdit("192.168.4.2")
self.port_input = QLineEdit("8080")
ip_layout.addWidget(QLabel("地址:"))
ip_layout.addWidget(self.ip_input)
ip_layout.addWidget(QLabel("端口:"))
ip_layout.addWidget(self.port_input)
# 数据发送区域
send_layout = QHBoxLayout()
self.send_text = QTextEdit()
self.send_btn = QPushButton("发送")
self.send_btn.clicked.connect(self.send_data)
send_layout.addWidget(self.send_text)
send_layout.addWidget(self.send_btn)
# 接收显示区域
self.recv_text = QTextBrowser()
self.clear_btn = QPushButton("清空")
self.clear_btn.clicked.connect(self.recv_text.clear)
# 状态栏
self.status = QStatusBar()
# 布局组装
layout.addWidget(self.protocol)
layout.addLayout(ip_layout)
layout.addLayout(send_layout)
layout.addWidget(QLabel("接收数据:"))
layout.addWidget(self.recv_text)
layout.addWidget(self.clear_btn)
layout.addWidget(self.status)
self.setLayout(layout)
def send_data(self):
protocol = self.protocol.currentText()
data = self.send_text.toPlainText().encode()
ip = self.ip_input.text()
port = int(self.port_input.text())
try:
if protocol == "TCP":
self.tcp_socket.connectToHost(ip, port)
self.tcp_socket.write(data)
self.tcp_socket.readyRead.connect(self.tcp_receive)
else:
self.udp_socket.writeDatagram(data, QHostAddress(ip), port)
self.status.showMessage("发送成功")
except Exception as e:
self.status.showMessage(f"错误: {str(e)}")
def tcp_receive(self):
data = self.tcp_socket.readAll().data().decode()
self.recv_text.append(f"[TCP接收] {data}")
def udp_receive(self):
while self.udp_socket.hasPendingDatagrams():
datagram = self.udp_socket.receiveDatagram()
data = datagram.data().data().decode()
self.recv_text.append(f"[UDP接收] {data}")
if __name__ == "__main__":
app = QApplication(sys.argv)
window = NetTestTool()
window.resize(600, 400)
window.show()
sys.exit(app.exec_())
测试结果
连接正常
连接错误