文章导读:本文全面解析了一套基于STM32F103RCT6微控制器的智能家居浴室系统解决方案。方案整合了Zigbee传感网络、WiFi云端互联和本地人机交互(HMI)三大核心模块,既适合物联网开发者与嵌入式工程师进行技术参考,也为智能家居爱好者提供了实用学习案例。
目录导航
项目背景
项目概述
项目设计了一套完整的智能浴室解决方案,基于STM32F103RCT6微控制器核心,实现了"环境感知 + 本地联动 + 远程可视化 + 云端连接"的一体化智能控制系统。
核心特色
特色功能 |
技术实现 |
应用价值 |
离线可用 |
本地传感采集、场景联动、手动控制 |
云端断连时系统正常工作 |
在线增值 |
云端/上位机远程控制、历史数据分析 |
提供远程监控和数据分析能力 |
安全可靠 |
TLS加密、身份鉴权、CRC校验、看门狗 |
保障系统稳定性和数据安全 |
技术栈
用户界面层:4.3寸USART HMI触摸屏 + Python上位机
通信层:ESP8266(WiFi) + CC2530(Zigbee协调器)
控制层:STM32F103RCT6 + FreeRTOS多任务系统
数据层:SQLite本地存储 + MQTT云端同步
设备层:智能传感器 + 执行器设备
系统架构设计
整体架构概览
本系统采用分层架构设计,实现设备层、控制层、通信层、应用层的结合:
)
核心接口
接口类型 |
连接方式 |
协议规范 |
主要功能 |
性能指标 |
WiFi通信 |
STM32 ↔ ESP8266 |
AT指令(115200bps) |
云端MQTT连接 |
延迟<100ms |
Zigbee网络 |
STM32 ↔ CC2530 |
自定义帧协议 |
传感器数据采集 |
响应<500ms |
人机界面 |
STM32 ↔ HMI屏 |
USART协议 |
本地交互控制 |
刷新率20fps |
上位机 |
Python ↔ ESP8266 |
TCP/UDP |
远程监控管理 |
并发连接>10 |
硬件选型配置
主控制器选型分析
STM32F103RCT6 核心参数
技术指标 |
参数值 |
应用优势 |
资源分配 |
CPU频率 |
72MHz |
满足多任务实时处理需求 |
FreeRTOS调度开销<5% |
Flash容量 |
256KB |
存储应用代码+配置参数 |
代码占用约180KB |
SRAM容量 |
48KB |
支持多任务栈空间 |
任务栈总计约16KB |
工作电压 |
3.3V |
兼容大部分传感器模块 |
低功耗设计 |
外设接口分配表
)
无线通信模块
WiFi模块对比选择
模块型号 |
优势特点 |
技术规格 |
选型理由 |
ESP8266-01S |
成本低,AT指令简单 |
802.11 b/g/n, 80MHz |
推荐:性价比最优 |
ESP32-C3 |
性能强,蓝牙支持 |
双核160MHz, BLE5.0 |
过度设计,成本高 |
RTL8710BN |
功耗极低 |
100MHz ARM Cortex-M4 |
生态支持不足 |
Zigbee协调器方案
CC2530 核心特性:
├── 技术参数
│ ├── CPU: 8051内核 @ 32MHz
│ ├── Flash: 256KB
│ ├── RAM: 8KB
│ └── RF: 2.4GHz IEEE 802.15.4
├── 通信性能
│ ├── 传输距离: 室内30m, 室外100m
│ ├── 网络容量: 支持65000个节点
│ └── 功耗控制: 睡眠<1μA, 接收23mA
└── 接口设计
├── UART: 115200bps 与STM32通信
├── SPI: 可选高速数据传输
└── GPIO: 状态指示与控制
人机交互设备
HMI触摸屏规格
技术参数 |
规格值 |
应用说明 |
屏幕尺寸 |
4.3英寸 |
适合浴室环境,视觉效果佳 |
分辨率 |
480×272像素 |
支持丰富的界面元素显示 |
触摸技术 |
电阻式触摸 |
防水性好,适合湿润环境 |
通信接口 |
UART串口 |
指令简单,实时性好 |
传感器设备
环境监测传感器
传感器类型 |
型号推荐 |
测量范围 |
精度指标 |
安装位置 |
温湿度 |
SHT30/DHT22 |
-40~80℃, 0~100%RH |
±0.3℃, ±2%RH |
浴室中央,避开热源 |
人体存在 |
PIR+毫米波 |
3-5米检测范围 |
99%检测准确率 |
门口或马桶上方 |
水浸检测 |
电极式 |
0~3mm水深 |
1mm检测精度 |
地漏周围,洗手台下 |
气体检测 |
MQ-135 |
CO2: 400-2000ppm |
±50ppm |
通风口附近 |
光照检测 |
BH1750 |
0-65535 lux |
±20% |
窗户或镜前 |
智能执行器设备
软件架构实现
FreeRTOS 多任务系统设计
任务优先级与资源分配
任务名称 |
优先级 |
堆栈大小 |
周期 |
核心职责 |
CPU占用率 |
WiFiTask |
4 (最高) |
512 Words |
事件驱动 |
云端通信管理 |
~15% |
ZigbeeTask |
3 (高) |
256 Words |
50ms |
Zigbee网络管理 |
~20% |
ControlTask |
3 (高) |
256 Words |
100ms |
设备控制逻辑 |
~10% |
SensorTask |
2 (中) |
256 Words |
200ms |
传感器数据采集 |
~25% |
DisplayTask |
2 (中) |
384 Words |
50ms |
HMI界面更新 |
~20% |
IdleTask |
0 (最低) |
128 Words |
- |
系统空闲处理 |
~10% |
系统启动与初始化流程
void StartDefaultTask(void *argument) {
HAL_Init();
SystemClock_Config();
MX_GPIO_Init();
MX_USART1_UART_Init();
MX_USART2_UART_Init();
MX_USART3_UART_Init();
MX_USB_DEVICE_Init();
MX_TIM2_Init();
MX_ADC1_Init();
MX_IWDG_Init();
CreateQueuesAndMutexes();
xTaskCreate(WiFiTask, "WiFi_Task", 512, NULL, 4, &wifiTaskHandle);
xTaskCreate(ZigbeeTask, "Zigbee_Task", 256, NULL, 3, &zigbeeTaskHandle);
xTaskCreate(ControlTask, "Control_Task", 256, NULL, 3, &controlTaskHandle);
xTaskCreate(SensorTask, "Sensor_Task", 256, NULL, 2, &sensorTaskHandle);
xTaskCreate(DisplayTask, "Display_Task", 384, NULL, 2, &displayTaskHandle);
vTaskStartScheduler();
while(1) {
HAL_Delay(1000);
}
}
static void CreateQueuesAndMutexes(void) {
sensorDataQueue = xQueueCreate(10, sizeof(SensorData_t));
controlCmdQueue = xQueueCreate(5, sizeof(ControlCmd_t));
displayEventQueue = xQueueCreate(8, sizeof(DisplayEvent_t));
uartMutex = xSemaphoreCreateMutex();
configMutex = xSemaphoreCreateMutex();
systemEventGroup = xEventGroupCreate();
configASSERT(sensorDataQueue && controlCmdQueue && displayEventQueue);
configASSERT(uartMutex && configMutex && systemEventGroup);
}
核心任务详细设计
1 WiFi通信任务 (WiFiTask)
void WiFiTask(void *argument) {
TickType_t lastWakeTime = xTaskGetTickCount();
ESP8266_Status_t wifiStatus = ESP_IDLE;
uint32_t reconnectDelay = 1000;
for(;;) {
switch(wifiStatus) {
case ESP_IDLE:
if(ESP8266_Init() == ESP_OK) {
wifiStatus = ESP_READY;
LogInfo("WiFi模块初始化成功");
}
break;
case ESP_READY:
if(ESP8266_ConnectAP(WIFI_SSID, WIFI_PASSWORD) == ESP_OK) {
wifiStatus = ESP_CONNECTED;
reconnectDelay = 1000;
xEventGroupSetBits(systemEventGroup, WIFI_CONNECTED_BIT);
}
break;
case ESP_CONNECTED:
if(MQTT_IsConnected()) {
ProcessMQTTMessages();
PublishSensorData();
xEventGroupSetBits(systemEventGroup, MQTT_CONNECTED_BIT);
} else {
MQTT_Connect();
}
break;
case ESP_ERROR:
vTaskDelay(pdMS_TO_TICKS(reconnectDelay));
reconnectDelay = MIN(reconnectDelay * 2, 60000);
wifiStatus = ESP_IDLE;
break;
}
vTaskDelayUntil(&lastWakeTime, pdMS_TO_TICKS(100));
}
}
2 Zigbee网络任务 (ZigbeeTask)
void ZigbeeTask(void *argument) {
uint8_t rxBuffer[ZIGBEE_RX_BUFFER_SIZE];
ZigbeeFrame_t frame;
TickType_t lastHeartbeat = xTaskGetTickCount();
if(Zigbee_StartCoordinator() != ZB_OK) {
LogError("Zigbee协调器启动失败");
vTaskSuspend(NULL);
}
for(;;) {
if(Zigbee_ReceiveFrame(&frame, pdMS_TO_TICKS(50)) == ZB_OK) {
switch(frame.cmd) {
case ZB_SENSOR_REPORT:
ProcessSensorReport(&frame);
break;
case ZB_DEVICE_JOIN:
ProcessDeviceJoin(&frame);
break;
case ZB_DEVICE_LEAVE:
ProcessDeviceLeave(&frame);
break;
case ZB_NETWORK_STATUS:
UpdateNetworkStatus(&frame);
break;
default:
LogWarning("未知Zigbee命令: 0x%02X", frame.cmd);
break;
}
}
if(xTaskGetTickCount() - lastHeartbeat > pdMS_TO_TICKS(30000)) {
Zigbee_SendHeartbeat();
lastHeartbeat = xTaskGetTickCount();
}
CheckOfflineDevices();
}
}
3 传感器数据任务 (SensorTask)
void SensorTask(void *argument) {
SensorData_t currentData = {0};
SensorData_t filteredData = {0};
AlarmStatus_t alarmStatus = {0};
for(;;) {
if(xQueueReceive(sensorDataQueue, ¤tData, pdMS_TO_TICKS(200)) == pdTRUE) {
ApplyDigitalFilter(¤tData, &filteredData);
CheckAlarmConditions(&filteredData, &alarmStatus);
UpdateSensorHistory(&filteredData);
SendDataToDisplay(&filteredData);
SendDataToControl(&filteredData);
if(xEventGroupGetBits(systemEventGroup) & MQTT_CONNECTED_BIT) {
PublishToMQTT(&filteredData);
}
}
HandleTimeoutAlarms();
}
}
static void ApplyDigitalFilter(SensorData_t *raw, SensorData_t *filtered) {
static float temp_history[FILTER_WINDOW_SIZE] = {0};
static float hum_history[FILTER_WINDOW_SIZE] = {0};
static uint8_t index = 0;
temp_history[index] = raw->temperature;
hum_history[index] = raw->humidity;
index = (index + 1) % FILTER_WINDOW_SIZE;
filtered->temperature = MedianFilter(temp_history, FILTER_WINDOW_SIZE);
filtered->humidity = MedianFilter(hum_history, FILTER_WINDOW_SIZE);
static float temp_smooth = 25.0f, hum_smooth = 50.0f;
temp_smooth = 0.3f * filtered->temperature + 0.7f * temp_smooth;
hum_smooth = 0.3f * filtered->humidity + 0.7f * hum_smooth;
filtered->temperature = temp_smooth;
filtered->humidity = hum_smooth;
filtered->occupancy = raw->occupancy;
filtered->water_leak = raw->water_leak;
filtered->co2_level = raw->co2_level;
filtered->lux_level = raw->lux_level;
filtered->timestamp = HAL_GetTick();
}
任务间通信机制
消息队列设计
typedef struct {
float temperature;
float humidity;
uint8_t occupancy;
uint8_t water_leak;
uint16_t co2_level;
uint16_t lux_level;
uint32_t timestamp;
uint8_t device_id;
} SensorData_t;
typedef struct {
uint8_t device_type;
uint8_t device_id;
uint8_t command;
uint16_t param1;
uint16_t param2;
uint8_t source;
} ControlCmd_t;
typedef struct {
uint8_t event_type;
uint8_t page_id;
uint8_t widget_id;
uint32_t param;
} DisplayEvent_t;
通信协议设计
MQTT云平台协议详解
云平台架构选择
云平台 |
技术特点 |
适用场景 |
成本评估 |
阿里云IoT ✅ |
完整生态、设备管理强 |
商业化产品推荐 |
中等 |
腾讯云IoT |
微信生态集成 |
需要微信集成时 |
中等 |
私有EMQX |
完全可控、无云依赖 |
企业内部部署 |
低(部署成本高) |
MQTT主题设计与数据流
Topic 层次结构设计:
├── 上行主题 (设备→云端)
│ ├── home/bathroom01/telemetry # 遥测数据 (QoS1)
│ ├── home/bathroom01/event # 事件告警 (QoS1)
│ ├── home/bathroom01/status # 设备状态 (QoS0)
│ └── home/bathroom01/response # 命令响应 (QoS1)
│
└── 下行主题 (云端→设备)
├── home/bathroom01/command # 控制命令 (QoS1)
├── home/bathroom01/config # 配置下发 (QoS1)
└── home/bathroom01/ota # 固件升级 (QoS1)
MQTT消息格式标准
{
"msgId": "msg_20241215_001",
"deviceId": "bathroom01",
"timestamp": 1702636800,
"data": {
"temperature": 26.5,
"humidity": 65.2,
"occupancy": 1,
"water_leak": 0,
"co2": 450,
"lux": 320,
"devices": {
"fan": 0,
"heater": 1,
"light": 1,
"mirror": 0
}
}
}
{
"msgId": "cmd_20241215_001",
"command": "device_control",
"params": {
"device": "fan",
"action": "turn_on",
"duration": 1800,
"priority": "user"
},
"timestamp": 1702636800,
"expire": 1702636860
}
{
"msgId": "alert_20241215_001",
"deviceId": "bathroom01",
"event": {
"type": "water_leak",
"level": "critical",
"message": "检测到漏水,位置:洗手台下方",
"location": "sink_area",
"actions": ["turn_off_water", "send_notification"]
},
"timestamp": 1702636800
}
HMI串口屏通信协议
指令格式与控件映射
HMI通信帧格式:指令 + 参数 + 结束符
├── STM32 → HMI (控制指令)
│ ├── 文本更新: t[id].txt="内容"
│ ├── 数值更新: n[id].val=数值
│ ├── 图片切换: p[id].pic=图片ID
│ ├── 进度条: j[id].val=百分比
│ ├── 曲线添加: add [id],通道,数值
│ └── 页面跳转: page 页面ID
│
└── HMI → STM32 (事件反馈)
├── 按钮点击: 65 00 07 [页面ID] [控件ID] 01 FF FF FF
├── 滑块变化: 65 00 07 [页面ID] [控件ID] [值H] [值L] FF FF FF
└── 页面切换: 65 00 04 01 [页面ID] FF FF FF
页面布局与控件设计
页面 |
控件ID |
控件类型 |
显示内容 |
交互功能 |
P0 主页 |
t0 |
文本 |
温度显示 |
只读 |
|
t1 |
文本 |
湿度显示 |
只读 |
|
t2 |
文本 |
CO2浓度 |
只读 |
|
ico0 |
图标 |
人体存在 |
状态指示 |
|
ico1 |
图标 |
漏水告警 |
状态指示 |
P1 控制 |
b0 |
按钮 |
排风扇 |
开关控制 |
|
b1 |
按钮 |
浴霸 |
开关控制 |
|
b2 |
按钮 |
照明 |
开关控制 |
|
h0 |
滑块 |
亮度调节 |
PWM输出 |
P2 历史 |
waveform0 |
波形 |
温湿度曲线 |
数据展示 |
|
t10 |
文本 |
最近告警 |
日志显示 |
)
)
HMI数据刷新策略
typedef struct {
uint8_t page_id;
uint32_t last_update;
uint8_t need_refresh;
float temp_cache;
float hum_cache;
} HMI_Manager_t;
void HMI_SmartUpdate(SensorData_t *data, HMI_Manager_t *hmi) {
uint32_t current_time = HAL_GetTick();
if(fabs(data->temperature - hmi->temp_cache) > 0.1f ||
(current_time - hmi->last_update) > 2000) {
HMI_SendCommand("t0.txt=\"%.1f°C\"", data->temperature);
hmi->temp_cache = data->temperature;
}
if(fabs(data->humidity - hmi->hum_cache) > 1.0f ||
(current_time - hmi->last_update) > 2000) {
HMI_SendCommand("t1.txt=\"%.0f%%\"", data->humidity);
hmi->hum_cache = data->humidity;
}
if(data->water_leak) {
HMI_SendCommand("ico1.pic=2");
HMI_SendCommand("page 3");
}
hmi->last_update = current_time;
}
Python上位机通信协议
网络层设计
"""
TCP/UDP通信协议实现
支持命令-响应模式和实时数据推送
"""
import socket, json, struct, threading
from typing import Dict, Callable, Optional
class BathroomClient:
def __init__(self, host: str = "192.168.1.100", port: int = 8888):
self.host = host
self.port = port
self.socket = None
self.connected = False
self.callbacks: Dict[str, Callable] = {}
def connect(self) -> bool:
"""建立TCP连接"""
try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(5.0)
self.socket.connect((self.host, self.port))
self.connected = True
threading.Thread(target=self._receive_loop, daemon=True).start()
return True
except Exception as e:
print(f"连接失败: {e}")
return False
def send_command(self, command: str, params: dict = None) -> Optional[dict]:
"""发送命令并等待响应"""
if not self.connected:
return None
request = {
"id": int(time.time() * 1000) % 10000,
"command": command,
"params": params or {},
"timestamp": int(time.time())
}
data = json.dumps(request, ensure_ascii=False).encode('utf-8')
length_header = struct.pack('!H', len(data))
try:
self.socket.sendall(length_header + data)
return self._wait_response(request["id"], timeout=3.0)
except Exception as e:
print(f"发送失败: {e}")
return None
def _receive_loop(self):
"""接收数据循环"""
buffer = bytearray()
while self.connected:
try:
chunk = self.socket.recv(4096)
if not chunk:
break
buffer.extend(chunk)
while len(buffer) >= 2:
length = struct.unpack('!H', buffer[:2])[0]
if len(buffer) < 2 + length:
break
frame_data = bytes(buffer[2:2+length])
del buffer[:2+length]
try:
message = json.loads(frame_data.decode('utf-8'))
self._handle_message(message)
except Exception as e:
print(f"解析消息失败: {e}")
except Exception as e:
print(f"接收错误: {e}")
break
self.connected = False
if __name__ == "__main__":
client = BathroomClient("192.168.1.100", 8888)
if client.connect():
sensors = client.send_command("get_sensors")
print(f"传感器数据: {sensors}")
result = client.send_command("control_device", {
"device": "fan",
"action": "on",
"duration": 1800
})
print(f"控制结果: {result}")
命令集定义
命令类型 |
命令名称 |
参数格式 |
响应格式 |
用途说明 |
查询类 |
get_sensors |
{} |
{temp, hum, co2, ...} |
获取实时传感器数据 |
|
get_devices |
{} |
{fan, light, heater, ...} |
获取设备状态 |
|
get_history |
{start, end, type} |
[{time, value}, ...] |
获取历史数据 |
控制类 |
control_device |
{device, action, params} |
{success, message} |
设备控制 |
|
set_scene |
{scene_name, params} |
{success, message} |
场景控制 |
|
set_config |
{key, value} |
{success, message} |
参数配置 |
订阅类 |
subscribe |
{types: []} |
{success} |
订阅实时数据推送 |
|
unsubscribe |
{types: []} |
{success} |
取消订阅 |
上位机开发
系统架构设计
)
技术栈选择对比
框架/库 |
优势特点 |
适用场景 |
推荐指数 |
PyQt5 ✅ |
界面美观、功能丰富、跨平台 |
桌面应用程序 |
⭐⭐⭐⭐⭐ |
Tkinter |
Python内置、简单易用 |
简单工具 |
⭐⭐⭐ |
Kivy |
触摸友好、移动端 |
移动应用 |
⭐⭐⭐ |
SQLite ✅ |
轻量级、无服务器、高性能 |
本地数据存储 |
⭐⭐⭐⭐⭐ |
PyQtGraph ✅ |
实时绘图、性能优秀 |
数据可视化 |
⭐⭐⭐⭐⭐ |
核心模块实现
1主界面设计 (MainWindow)
"""
智能浴室监控系统主界面
功能:实时数据展示、设备控制、历史数据查看
"""
import sys
import time
import json
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *
from PyQt5.QtGui import *
import pyqtgraph as pg
import sqlite3
from datetime import datetime, timedelta
class BathroomMonitor(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("智能浴室监控系统 v1.0")
self.setGeometry(100, 100, 1200, 800)
self.setStyleSheet("""
QMainWindow {
background-color: #f0f0f0;
}
QGroupBox {
font: bold 14px;
border: 2px solid #cccccc;
border-radius: 5px;
margin-top: 10px;
padding-top: 10px;
}
QGroupBox::title {
subcontrol-origin: margin;
left: 10px;
padding: 0 10px 0 10px;
}
""")
self.client = BathroomClient()
self.connected = False
self.db_manager = DatabaseManager()
self.init_ui()
self.update_timer = QTimer()
self.update_timer.timeout.connect(self.update_data)
self.update_timer.start(1000)
def init_ui(self):
"""初始化用户界面"""
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QHBoxLayout(central_widget)
left_panel = self.create_left_panel()
main_layout.addWidget(left_panel, 1)
right_panel = self.create_right_panel()
main_layout.addWidget(right_panel, 2)
self.statusBar().showMessage("就绪")
self.create_menu_bar()
def create_left_panel(self):
"""创建左侧控制面板"""
panel = QWidget()
layout = QVBoxLayout(panel)
conn_group = QGroupBox("连接状态")
conn_layout = QHBoxLayout(conn_group)
self.status_indicator = QLabel("●")
self.status_indicator.setStyleSheet("color: red; font-size: 20px;")
self.status_label = QLabel("离线")
self.connect_btn = QPushButton("连接")
self.connect_btn.clicked.connect(self.toggle_connection)
conn_layout.addWidget(self.status_indicator)
conn_layout.addWidget(self.status_label)
conn_layout.addStretch()
conn_layout.addWidget(self.connect_btn)
sensor_group = QGroupBox("环境数据")
sensor_layout = QFormLayout(sensor_group)
self.temp_label = self.create_value_label("--", "°C", "red")
self.hum_label = self.create_value_label("--", "%", "blue")
self.co2_label = self.create_value_label("--", "ppm", "orange")
self.lux_label = self.create_value_label("--", "lux", "green")
sensor_layout.addRow("🌡️ 温度:", self.temp_label)
sensor_layout.addRow("💧 湿度:", self.hum_label)
sensor_layout.addRow("🌪️ CO2:", self.co2_label)
sensor_layout.addRow("💡 光照:", self.lux_label)
status_group = QGroupBox("状态指示")
status_layout = QFormLayout(status_group)
self.occupancy_led = self.create_led_indicator()
self.leak_led = self.create_led_indicator()
status_layout.addRow("👤 人体存在:", self.occupancy_led)
status_layout.addRow("💧 漏水告警:", self.leak_led)
control_group = QGroupBox("设备控制")
control_layout = QGridLayout(control_group)
self.fan_btn = self.create_device_button("🌪️ 排风扇", "fan")
self.heater_btn = self.create_device_button("🔥 浴霸", "heater")
self.light_btn = self.create_device_button("💡 照明", "light")
self.mirror_btn = self.create_device_button("🪞 除雾镜", "mirror")
control_layout.addWidget(self.fan_btn, 0, 0)
control_layout.addWidget(self.heater_btn, 0, 1)
control_layout.addWidget(self.light_btn, 1, 0)
control_layout.addWidget(self.mirror_btn, 1, 1)
scene_group = QGroupBox("场景模式")
scene_layout = QVBoxLayout(scene_group)
shower_btn = QPushButton("🚿 淋浴模式")
shower_btn.clicked.connect(lambda: self.set_scene("shower"))
toilet_btn = QPushButton("🚽 如厕模式")
toilet_btn.clicked.connect(lambda: self.set_scene("toilet"))
clean_btn = QPushButton("🧹 清洁模式")
clean_btn.clicked.connect(lambda: self.set_scene("clean"))
scene_layout.addWidget(shower_btn)
scene_layout.addWidget(toilet_btn)
scene_layout.addWidget(clean_btn)
layout.addWidget(conn_group)
layout.addWidget(sensor_group)
layout.addWidget(status_group)
layout.addWidget(control_group)
layout.addWidget(scene_group)
layout.addStretch()
return panel
def create_right_panel(self):
"""创建右侧数据面板"""
panel = QWidget()
layout = QVBoxLayout(panel)
chart_group = QGroupBox("历史数据")
chart_layout = QVBoxLayout(chart_group)
chart_controls = QHBoxLayout()
self.chart_type = QComboBox()
self.chart_type.addItems(["温湿度", "CO2浓度", "设备状态"])
self.chart_type.currentTextChanged.connect(self.update_chart)
self.time_range = QComboBox()
self.time_range.addItems(["最近1小时", "最近6小时", "最近24小时", "最近7天"])
self.time_range.currentTextChanged.connect(self.update_chart)
export_btn = QPushButton("导出数据")
export_btn.clicked.connect(self.export_data)
chart_controls.addWidget(QLabel("显示类型:"))
chart_controls.addWidget(self.chart_type)
chart_controls.addWidget(QLabel("时间范围:"))
chart_controls.addWidget(self.time_range)
chart_controls.addStretch()
chart_controls.addWidget(export_btn)
self.plot_widget = pg.PlotWidget()
self.plot_widget.setBackground('w')
self.plot_widget.setLabel('left', '数值')
self.plot_widget.setLabel('bottom', '时间')
self.plot_widget.showGrid(x=True, y=True)
chart_layout.addLayout(chart_controls)
chart_layout.addWidget(self.plot_widget)
log_group = QGroupBox("告警日志")
log_layout = QVBoxLayout(log_group)
self.log_text = QTextEdit()
self.log_text.setMaximumHeight(200)
self.log_text.setReadOnly(True)
clear_log_btn = QPushButton("清空日志")
clear_log_btn.clicked.connect(self.log_text.clear)
log_layout.addWidget(self.log_text)
log_layout.addWidget(clear_log_btn)
layout.addWidget(chart_group)
layout.addWidget(log_group)
return panel
def create_value_label(self, value, unit, color):
"""创建数值显示标签"""
label = QLabel(f"<span style='color: {color}; font-size: 18px; font-weight: bold;'>{value}</span> {unit}")
return label
def create_led_indicator(self):
"""创建LED状态指示器"""
indicator = QLabel("●")
indicator.setStyleSheet("color: gray; font-size: 16px;")
return indicator
def create_device_button(self, text, device_name):
"""创建设备控制按钮"""
btn = QPushButton(text)
btn.setCheckable(True)
btn.clicked.connect(lambda checked: self.control_device(device_name, checked))
btn.setStyleSheet("""
QPushButton {
min-height: 50px;
font-size: 14px;
border: 2px solid #ddd;
border-radius: 5px;
}
QPushButton:checked {
background-color: #4CAF50;
color: white;
border-color: #45a049;
}
""")
return btn
2 数据库管理模块
class DatabaseManager:
"""数据库管理类 - 负责SQLite数据的存储与查询"""
def __init__(self, db_path="bathroom_data.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""初始化数据库表结构"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS sensor_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp INTEGER NOT NULL,
temperature REAL,
humidity REAL,
co2 INTEGER,
lux INTEGER,
occupancy INTEGER,
water_leak INTEGER
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS device_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp INTEGER NOT NULL,
device_name TEXT NOT NULL,
status INTEGER NOT NULL,
action TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS alert_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp INTEGER NOT NULL,
event_type TEXT NOT NULL,
level TEXT NOT NULL,
message TEXT,
resolved INTEGER DEFAULT 0
)
""")
conn.commit()
conn.close()
def insert_sensor_data(self, data):
"""插入传感器数据"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
INSERT INTO sensor_data
(timestamp, temperature, humidity, co2, lux, occupancy, water_leak)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
int(time.time()),
data.get('temperature'),
data.get('humidity'),
data.get('co2'),
data.get('lux'),
data.get('occupancy', 0),
data.get('water_leak', 0)
))
conn.commit()
conn.close()
def get_history_data(self, hours=24, data_type='all'):
"""获取历史数据"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
start_time = int(time.time()) - (hours * 3600)
cursor.execute("""
SELECT timestamp, temperature, humidity, co2, lux
FROM sensor_data
WHERE timestamp > ?
ORDER BY timestamp
""", (start_time,))
results = cursor.fetchall()
conn.close()
return results
界面效果展示

技术学习价值
本项目作为完整的IoT系统实现,具有极高的学习和参考价值:
知识体系覆盖
- 嵌入式开发:STM32 + FreeRTOS多任务编程
- 通信协议:UART/SPI/I2C/Zigbee/WiFi/MQTT全栈
- 上位机开发:Python GUI编程与数据库操作
- 系统集成:硬件选型、软件架构、安全设计
- 产品思维:用户体验、成本控制、可维护性
🛠️ 实践技能提升
📖 学习路线建议:
- 基础:单片机 → FreeRTOS → 通信协议
- 进阶:系统架构 → 安全设计 → 性能优化
- 应用:产品化 → 市场分析 → 商业模式
🔧 开发能力培养:
- 代码规范与文档编写
- 版本控制与团队协作
- 测试验证与质量保证
- 问题诊断与故障排除
结语
智能家居是物联网技术的重要应用场景,正处于快速发展阶段。本项目提供完整的技术方案和详细的实现指南,为开发者打造了一个优质的学习范本和参考模板。