基于 YOLOv8 实时目标检测 + UNet 图像分割,并将处理结果通过 MQTT 发送到 Frigate,用于报警和录像标记。
🧠 总体流程图
摄像头(RTSP / USB / 视频文件)
↓
[帧获取模块:OpenCV]
↓
┌──────────────────────────┐
│ YOLOv8:检测感兴趣区域(涌水)│
└──────────────────────────┘
↓
┌──────────────────────────┐
│ UNet:图像分割提取边界掩码 │
└──────────────────────────┘
↓
检测结果发布 → MQTT → Frigate
📦 所需组件
ultralytics
(YOLOv8)paho-mqtt
(MQTT 发布)cv2
(摄像头/视频读取)torch
+segmentation_models_pytorch
(或你已有 UNet 实现)- 你的 UNet 模型权重(如
.pth
)
✅ 实时检测 + 分割 + MQTT 推送脚本
以下是一个完整可运行的结构模板,你只需补充自己的 UNet 模型定义和权重路径即可:
import cv2
import torch
import numpy as np
import uuid
import json
import paho.mqtt.publish as publish
from ultralytics import YOLO
from torchvision import transforms
# ---------- 配置区 ----------
RTSP_URL = "rtsp://192.168.1.100:8554/stream" # 或者 0 表示USB摄像头
MQTT_HOST = '127.0.0.1'
MQTT_PORT = 1883
FRIGATE_TOPIC = 'frigate/events'
CAMERA_NAME = 'usb_cam'
CONF_THRESHOLD = 0.5
YOLO_MODEL_PATH = "yolov8.pt"
UNET_MODEL_PATH = "unet_model.pth"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
# ----------------------------
# 加载 YOLOv8 模型
yolo = YOLO(YOLO_MODEL_PATH)
# 加载 UNet 模型(你需要替换为你自己的定义)
class UNet(torch.nn.Module):
def __init__(self):
super().__init__()
# ... 你自己的 UNet 定义 ...
def forward(self, x):
# ... forward 逻辑 ...
return x
unet = UNet().to(DEVICE)
unet.load_state_dict(torch.load(UNET_MODEL_PATH, map_location=DEVICE))
unet.eval()
# 图像预处理(你根据 UNet 模型训练时写的来)
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Resize((256, 256)) # 修改为你模型输入
])
# 打开视频流
cap = cv2.VideoCapture(RTSP_URL)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# YOLOv8 检测
results = yolo(frame)
for r in results:
boxes = r.boxes.xyxy.cpu().numpy()
scores = r.boxes.conf.cpu().numpy()
classes = r.boxes.cls.cpu().numpy().astype(int)
for i in range(len(boxes)):
if scores[i] < CONF_THRESHOLD:
continue
x1, y1, x2, y2 = map(int, boxes[i])
label = yolo.model.names[classes[i]]
crop = frame[y1:y2, x1:x2]
# 图像分割(UNet)
input_tensor = transform(crop).unsqueeze(0).to(DEVICE)
with torch.no_grad():
mask = unet(input_tensor)
mask = torch.sigmoid(mask)
mask_np = mask.squeeze().cpu().numpy()
# 简单阈值处理(根据你训练策略调整)
if np.mean(mask_np) > 0.2: # 表示有明显掩码
payload = {
"type": "new",
"after": {
"id": str(uuid.uuid4()),
"camera": CAMERA_NAME,
"label": label,
"top_score": float(scores[i]),
"box": {
"x_min": x1,
"y_min": y1,
"x_max": x2,
"y_max": y2
}
}
}
publish.single(
topic=FRIGATE_TOPIC,
payload=json.dumps(payload),
hostname=MQTT_HOST,
port=MQTT_PORT
)
print(f"检测到:{label},已发送 MQTT")
cap.release()
🧪 可视化调试建议
你可以在 for
循环中加入:
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.imshow("Detection", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
🛠️ 你需要做的补充
项目 | 描述 |
---|---|
UNet() |
替换为你自己的网络定义类 |
UNET_MODEL_PATH |
替换为你训练好的 .pth 模型路径 |
transform |
替换为你的 UNet 训练时使用的图像预处理 |
mask 阈值 |
根据你的分割效果设定合理的掩码判断标准 |
label |
如果想统一为 flood_water 等,可以直接硬编码 |
📌 最终实现效果
- 每当摄像头画面中检测到符合条件的目标 + 分割确认,通过 MQTT 报警;
- Frigate 接收到事件后会标记录像、触发通知;
- 系统实现了“YOLOv8 + UNet 联合监测”!