Windows11家庭版配置frigate 嵌入自研算法-【3】

发布于:2025-07-17 ⋅ 阅读:(14) ⋅ 点赞:(0)

基于 YOLOv8 实时目标检测 + UNet 图像分割,并将处理结果通过 MQTT 发送到 Frigate,用于报警和录像标记。


🧠 总体流程图

       摄像头(RTSP / USB / 视频文件)
                   ↓
            [帧获取模块:OpenCV]
                   ↓
     ┌──────────────────────────┐
     │ YOLOv8:检测感兴趣区域(涌水)│
     └──────────────────────────┘
                   ↓
     ┌──────────────────────────┐
     │ UNet:图像分割提取边界掩码    │
     └──────────────────────────┘
                   ↓
      检测结果发布 → MQTT → Frigate

📦 所需组件

  • ultralytics(YOLOv8)
  • paho-mqtt(MQTT 发布)
  • cv2(摄像头/视频读取)
  • torch + segmentation_models_pytorch(或你已有 UNet 实现)
  • 你的 UNet 模型权重(如 .pth

✅ 实时检测 + 分割 + MQTT 推送脚本

以下是一个完整可运行的结构模板,你只需补充自己的 UNet 模型定义和权重路径即可:

import cv2
import torch
import numpy as np
import uuid
import json
import paho.mqtt.publish as publish
from ultralytics import YOLO
from torchvision import transforms

# ---------- 配置区 ----------
RTSP_URL = "rtsp://192.168.1.100:8554/stream"  # 或者 0 表示USB摄像头
MQTT_HOST = '127.0.0.1'
MQTT_PORT = 1883
FRIGATE_TOPIC = 'frigate/events'
CAMERA_NAME = 'usb_cam'
CONF_THRESHOLD = 0.5
YOLO_MODEL_PATH = "yolov8.pt"
UNET_MODEL_PATH = "unet_model.pth"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
# ----------------------------

# 加载 YOLOv8 模型
yolo = YOLO(YOLO_MODEL_PATH)

# 加载 UNet 模型(你需要替换为你自己的定义)
class UNet(torch.nn.Module):
    def __init__(self):
        super().__init__()
        # ... 你自己的 UNet 定义 ...

    def forward(self, x):
        # ... forward 逻辑 ...
        return x

unet = UNet().to(DEVICE)
unet.load_state_dict(torch.load(UNET_MODEL_PATH, map_location=DEVICE))
unet.eval()

# 图像预处理(你根据 UNet 模型训练时写的来)
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((256, 256))  # 修改为你模型输入
])

# 打开视频流
cap = cv2.VideoCapture(RTSP_URL)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # YOLOv8 检测
    results = yolo(frame)
    for r in results:
        boxes = r.boxes.xyxy.cpu().numpy()
        scores = r.boxes.conf.cpu().numpy()
        classes = r.boxes.cls.cpu().numpy().astype(int)

        for i in range(len(boxes)):
            if scores[i] < CONF_THRESHOLD:
                continue

            x1, y1, x2, y2 = map(int, boxes[i])
            label = yolo.model.names[classes[i]]
            crop = frame[y1:y2, x1:x2]

            # 图像分割(UNet)
            input_tensor = transform(crop).unsqueeze(0).to(DEVICE)
            with torch.no_grad():
                mask = unet(input_tensor)
                mask = torch.sigmoid(mask)
                mask_np = mask.squeeze().cpu().numpy()

            # 简单阈值处理(根据你训练策略调整)
            if np.mean(mask_np) > 0.2:  # 表示有明显掩码
                payload = {
                    "type": "new",
                    "after": {
                        "id": str(uuid.uuid4()),
                        "camera": CAMERA_NAME,
                        "label": label,
                        "top_score": float(scores[i]),
                        "box": {
                            "x_min": x1,
                            "y_min": y1,
                            "x_max": x2,
                            "y_max": y2
                        }
                    }
                }

                publish.single(
                    topic=FRIGATE_TOPIC,
                    payload=json.dumps(payload),
                    hostname=MQTT_HOST,
                    port=MQTT_PORT
                )
                print(f"检测到:{label},已发送 MQTT")

cap.release()

🧪 可视化调试建议

你可以在 for 循环中加入:

cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.imshow("Detection", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
    break

🛠️ 你需要做的补充

项目 描述
UNet() 替换为你自己的网络定义类
UNET_MODEL_PATH 替换为你训练好的 .pth 模型路径
transform 替换为你的 UNet 训练时使用的图像预处理
mask 阈值 根据你的分割效果设定合理的掩码判断标准
label 如果想统一为 flood_water 等,可以直接硬编码

📌 最终实现效果

  • 每当摄像头画面中检测到符合条件的目标 + 分割确认,通过 MQTT 报警;
  • Frigate 接收到事件后会标记录像、触发通知;
  • 系统实现了“YOLOv8 + UNet 联合监测”!


网站公告

今日签到

点亮在社区的每一天
去签到