交通路口智能监测平台设计与实现

发布于:2024-07-25 ⋅ 阅读:(126) ⋅ 点赞:(0)

1.概述

交通要道的路口上人车穿行,特别是上下班早高峰,且时常发生交通事故。因此对交通路口的车流量和人流量的监测必不可少。

2.检测模型

使用的检测模型为YOLOX模型,模型权重为训练VOC数据集得来,其中包括了二十个类别,但我们主要针对地面交通路口进行监测,选择了最关键的三个监测要素作为监测目标,分别为人、汽车和自行车。YOLOX神经网络模型代码如下:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Copyright (c) Megvii, Inc. and its affiliates.

import torch
import torch.nn as nn

from .darknet import BaseConv, CSPDarknet, CSPLayer, DWConv


class YOLOXHead(nn.Module):
    def __init__(self, num_classes, width=1.0, in_channels=[256, 512, 1024], act="silu", depthwise=False, ):
        super().__init__()
        Conv = DWConv if depthwise else BaseConv

        self.cls_convs = nn.ModuleList()
        self.reg_convs = nn.ModuleList()
        self.cls_preds = nn.ModuleList()
        self.reg_preds = nn.ModuleList()
        self.obj_preds = nn.ModuleList()
        self.stems = nn.ModuleList()

        for i in range(len(in_channels)):
            self.stems.append(
                BaseConv(in_channels=int(in_channels[i] * width), out_channels=int(256 * width), ksize=1, stride=1,
                         act=act))
            self.cls_convs.append(nn.Sequential(*[
                Conv(in_channels=int(256 * width), out_channels=int(256 * width), ksize=3, stride=1, act=act),
                Conv(in_channels=int(256 * width), out_channels=int(256 * width), ksize=3, stride=1, act=act),
            ]))
            self.cls_preds.append(
                nn.Conv2d(in_channels=int(256 * width), out_channels=num_classes, kernel_size=1, stride=1, padding=0)
            )

            self.reg_convs.append(nn.Sequential(*[
                Conv(in_channels=int(256 * width), out_channels=int(256 * width), ksize=3, stride=1, act=act),
                Conv(in_channels=int(256 * width), out_channels=int(256 * width), ksize=3, stride=1, act=act)
            ]))
            self.reg_preds.append(
                nn.Conv2d(in_channels=int(256 * width), out_channels=4, kernel_size=1, stride=1, padding=0)
            )
            self.obj_preds.append(
                nn.Conv2d(in_channels=int(256 * width), out_channels=1, kernel_size=1, stride=1, padding=0)
            )

    def forward(self, inputs):
        outputs = []
        for k, x in enumerate(inputs):
            x = self.stems[k](x)

            cls_feat = self.cls_convs[k](x)
            cls_output = self.cls_preds[k](cls_feat)

            reg_feat = self.reg_convs[k](x)
            reg_output = self.reg_preds[k](reg_feat)

            obj_output = self.obj_preds[k](reg_feat)

            output = torch.cat([reg_output, obj_output, cls_output], 1)
            outputs.append(output)
        return outputs


class YOLOPAFPN(nn.Module):
    def __init__(self, depth=1.0, width=1.0, in_features=("dark3", "dark4", "dark5"), in_channels=[256, 512, 1024],
                 depthwise=False, act="silu"):
        super().__init__()
        Conv = DWConv if depthwise else BaseConv
        self.backbone = CSPDarknet(depth, width, depthwise=depthwise, act=act)
        self.in_features = in_features

        self.upsample = nn.Upsample(scale_factor=2, mode="nearest")

        # -------------------------------------------#
        #   20, 20, 1024 -> 20, 20, 512
        # -------------------------------------------#
        self.lateral_conv0 = BaseConv(int(in_channels[2] * width), int(in_channels[1] * width), 1, 1, act=act)

        # -------------------------------------------#
        #   40, 40, 1024 -> 40, 40, 512
        # -------------------------------------------#
        self.C3_p4 = CSPLayer(
            int(2 * in_channels[1] * width),
            int(in_channels[1] * width),
            round(3 * depth),
            False,
            depthwise=depthwise,
            act=act,
        )

        # -------------------------------------------#
        #   40, 40, 512 -> 40, 40, 256
        # -------------------------------------------#
        self.reduce_conv1 = BaseConv(int(in_channels[1] * width), int(in_channels[0] * width), 1, 1, act=act)
        # -------------------------------------------#
        #   80, 80, 512 -> 80, 80, 256
        # -------------------------------------------#
        self.C3_p3 = CSPLayer(
            int(2 * in_channels[0] * width),
            int(in_channels[0] * width),
            round(3 * depth),
            False,
            depthwise=depthwise,
            act=act,
        )

        # -------------------------------------------#
        #   80, 80, 256 -> 40, 40, 256
        # -------------------------------------------#
        self.bu_conv2 = Conv(int(in_channels[0] * width), int(in_channels[0] * width), 3, 2, act=act)
        # -------------------------------------------#
        #   40, 40, 256 -> 40, 40, 512
        # -------------------------------------------#
        self.C3_n3 = CSPLayer(
            int(2 * in_channels[0] * width),
            int(in_channels[1] * width),
            round(3 * depth),
            False,
            depthwise=depthwise,
            act=act,
        )

        # -------------------------------------------#
        #   40, 40, 512 -> 20, 20, 512
        # -------------------------------------------#
        self.bu_conv1 = Conv(int(in_channels[1] * width), int(in_channels[1] * width), 3, 2, act=act)
        # -------------------------------------------#
        #   20, 20, 1024 -> 20, 20, 1024
        # -------------------------------------------#
        self.C3_n4 = CSPLayer(
            int(2 * in_channels[1] * width),
            int(in_channels[2] * width),
            round(3 * depth),
            False,
            depthwise=depthwise,
            act=act,
        )

    def forward(self, input):
        out_features = self.backbone.forward(input)
        [feat1, feat2, feat3] = [out_features[f] for f in self.in_features]

        P5 = self.lateral_conv0(feat3)

        P5_upsample = self.upsample(P5)

        P5_upsample = torch.cat([P5_upsample, feat2], 1)
        P5_upsample = self.C3_p4(P5_upsample)

        P4 = self.reduce_conv1(P5_upsample)
        P4_upsample = self.upsample(P4)
        P4_upsample = torch.cat([P4_upsample, feat1], 1)
        P3_out = self.C3_p3(P4_upsample)
        P3_downsample = self.bu_conv2(P3_out)
        P3_downsample = torch.cat([P3_downsample, P4], 1)
        P4_out = self.C3_n3(P3_downsample)
        P4_downsample = self.bu_conv1(P4_out)
        P4_downsample = torch.cat([P4_downsample, P5], 1)
        P5_out = self.C3_n4(P4_downsample)

        return (P3_out, P4_out, P5_out)


class YoloBody(nn.Module):
    def __init__(self, num_classes, phi):
        super().__init__()
        depth_dict = {'nano': 0.33, 'tiny': 0.33, 's': 0.33, 'm': 0.67, 'l': 1.00, 'x': 1.33, }
        width_dict = {'nano': 0.25, 'tiny': 0.375, 's': 0.50, 'm': 0.75, 'l': 1.00, 'x': 1.25, }
        depth, width = depth_dict[phi], width_dict[phi]
        depthwise = True if phi == 'nano' else False

        self.backbone = YOLOPAFPN(depth, width, depthwise=depthwise)
        self.head = YOLOXHead(num_classes, width, depthwise=depthwise)

    def forward(self, x):
        fpn_outs = self.backbone.forward(x)
        outputs = self.head.forward(fpn_outs)
        return outputs

3.系统可视化

系统使用了PyQt5作为可视化工具,整体效果如下:
在这里插入图片描述

系统实现了实时摄像头监测功能和上传视频监测功能,视频流都通过opencv库来读取。读取成功后将每一帧交由YOLOX模型进行检测。
摄像头功能代码如下:

    def open_camera_btn(self):
        if not self.timer_camera.isActive():  # 定时器未启动
            flag = self.cap.open(self.CAM_NUM)
            if flag == False:
                msg = QtWidgets.QMessageBox.warning(self.window, '警告!', "请检查摄像头是否连接正确",
                                                    buttons=QtWidgets.QMessageBox.Ok)
            else:
                self.timer_camera.start()  # 设置30毫秒后,定时器将每隔30毫秒调用timeout函数
                self.open_camera.setText('关闭监测')

        # 关闭检测按钮事件
        else:
            self.timer_camera.stop()
            self.cap.release()
            self.label_show_camera.clear()  # 清空视频显示区域
            self.open_camera.setText('开始监测')

上传视频监测代码如下:

    def video_detect_btn(self):
        fileUrl, _ = QFileDialog.getOpenFileName(self, "Open Video File", QDir.currentPath(),
                                                  "Video Files (*.mp4 *.avi *.mov *.wmv);;")
        # 视频选择成功
        if fileUrl:
            print(fileUrl)
            self.label_video_url.setText(fileUrl)
            if not self.timer_camera.isActive():  # 定时器未启动

                flag = self.cap.open(fileUrl)
                if flag == False:
                    msg = QtWidgets.QMessageBox.warning(self.window, '警告!', "请检查摄像头是否连接正确",
                                                        buttons=QtWidgets.QMessageBox.Ok)
                else:
                    self.timer_camera.start()  # 设置30毫秒后,定时器将每隔30毫秒调用timeout函数

点击上传监测视频后,将打开资源文件窗口进行视频文件选择,效果如下:
在这里插入图片描述

用户选择完成后,将开始逐帧检测。
整个页面的逻辑代码如下:

import cv2
from PIL import Image
from PyQt5 import uic, QtCore, QtGui, QtWidgets
import numpy as np
from PyQt5.QtCore import QDir
from PyQt5.QtWidgets import QMainWindow, QFileDialog

from utils.utils import get_class_dic
from yolo import YOLO


class Monitor(QMainWindow):
    def __init__(self):
        super().__init__()  # 父类的构造函数
        self.CAM_NUM = 0  # 调用摄像头
        self.timer_camera = QtCore.QTimer()
        self.cap = cv2.VideoCapture()  # 视频流

        self.set_ui_init()
        self.slot_init()

        self.yolox = YOLO()

    # 布局初始化
    def set_ui_init(self):

        self.window = uic.loadUi('ui/monitor.ui',self)
        self.open_camera = self.window.open_camera
        self.label_show_camera = self.window.label_camera
        self.all_result = self.window.res_array
        self.person_num = self.window.person_label
        self.car_num = self.window.car_label
        self.light_num = self.window.light_label

        self.label_1 = self.window.id_one
        self.label_2 = self.window.id_two
        self.label_3 = self.window.id_three

        self.label_video_url = self.window.label_video_url

    # 槽函数初始化
    def slot_init(self):
        self.open_camera.clicked.connect(self.open_camera_btn)
        self.timer_camera.timeout.connect(self.show_camera)  # 将timeout绑定槽函数show_camera

        self.btn_submit_video.clicked.connect(self.video_detect_btn)

    # 显示摄像头
    def show_camera(self):
        flag, image = self.cap.read()
        if not flag:
            self.timer_camera.stop()
            self.cap.release()
            self.label_show_camera.clear()  # 清空视频显示区域
        else:
            show = cv2.resize(image, (620, 500))
            show = Image.fromarray(show)
            re_show, pre_label = self.yolox.detect_image(show)
            res_dict = get_class_dic(pre_label)
            self.show_res_num(res_dict)
            res_dict.clear()
            re_show = np.array(re_show)
            re_show = cv2.cvtColor(re_show, cv2.COLOR_BGR2RGB)
            showImage = QtGui.QImage(re_show.data, re_show.shape[1], re_show.shape[0],
                                     QtGui.QImage.Format_RGB888)
            self.label_show_camera.setPixmap(QtGui.QPixmap.fromImage(showImage))  # 将画面显示在ui控件上

    # 摄像头按钮点击事件
    def open_camera_btn(self):
        if not self.timer_camera.isActive():  # 定时器未启动
            flag = self.cap.open(self.CAM_NUM)
            if flag == False:
                msg = QtWidgets.QMessageBox.warning(self.window, '警告!', "请检查摄像头是否连接正确",
                                                    buttons=QtWidgets.QMessageBox.Ok)
            else:
                self.timer_camera.start()  # 设置30毫秒后,定时器将每隔30毫秒调用timeout函数
                self.open_camera.setText('关闭监测')

        # 关闭检测按钮事件
        else:
            self.timer_camera.stop()
            self.cap.release()
            self.label_show_camera.clear()  # 清空视频显示区域
            self.open_camera.setText('开始监测')

    # 视频监测
    def video_detect_btn(self):
        fileUrl, _ = QFileDialog.getOpenFileName(self, "Open Video File", QDir.currentPath(),
                                                  "Video Files (*.mp4 *.avi *.mov *.wmv);;")
        # 视频选择成功
        if fileUrl:
            print(fileUrl)
            self.label_video_url.setText(fileUrl)
            if not self.timer_camera.isActive():  # 定时器未启动

                flag = self.cap.open(fileUrl)
                if flag == False:
                    msg = QtWidgets.QMessageBox.warning(self.window, '警告!', "请检查摄像头是否连接正确",
                                                        buttons=QtWidgets.QMessageBox.Ok)
                else:
                    self.timer_camera.start()  # 设置30毫秒后,定时器将每隔30毫秒调用timeout函数
    def show_res_num(self, dict):
        label_list = [*dict]
        all = 0
        if 'person' in label_list:
            self.person_num.setText(str(dict['person']))
            all += int(dict['person'])
        else:
            self.person_num.setText('0')
        if 'car' in label_list:
            self.car_num.setText(str(dict['car']))
            all += int(dict['car'])
        else:
            self.car_num.setText('0')
        if 'bicycle' in label_list:
            self.light_num.setText(str(dict['bicycle']))
            all += int(dict['bicycle'])
        else:
            self.light_num.setText('0')
        self.all_result.setText(str(all))

4.效果

在这里插入图片描述

5.参考

https://blog.csdn.net/weixin_44791964/article/details/120476949


网站公告

今日签到

点亮在社区的每一天
去签到