1.概述
交通要道的路口上人车穿行,特别是上下班早高峰,且时常发生交通事故。因此对交通路口的车流量和人流量的监测必不可少。
2.检测模型
使用的检测模型为YOLOX模型,模型权重为训练VOC数据集得来,其中包括了二十个类别,但我们主要针对地面交通路口进行监测,选择了最关键的三个监测要素作为监测目标,分别为人、汽车和自行车。YOLOX神经网络模型代码如下:
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Copyright (c) Megvii, Inc. and its affiliates.
import torch
import torch.nn as nn
from .darknet import BaseConv, CSPDarknet, CSPLayer, DWConv
class YOLOXHead(nn.Module):
def __init__(self, num_classes, width=1.0, in_channels=[256, 512, 1024], act="silu", depthwise=False, ):
super().__init__()
Conv = DWConv if depthwise else BaseConv
self.cls_convs = nn.ModuleList()
self.reg_convs = nn.ModuleList()
self.cls_preds = nn.ModuleList()
self.reg_preds = nn.ModuleList()
self.obj_preds = nn.ModuleList()
self.stems = nn.ModuleList()
for i in range(len(in_channels)):
self.stems.append(
BaseConv(in_channels=int(in_channels[i] * width), out_channels=int(256 * width), ksize=1, stride=1,
act=act))
self.cls_convs.append(nn.Sequential(*[
Conv(in_channels=int(256 * width), out_channels=int(256 * width), ksize=3, stride=1, act=act),
Conv(in_channels=int(256 * width), out_channels=int(256 * width), ksize=3, stride=1, act=act),
]))
self.cls_preds.append(
nn.Conv2d(in_channels=int(256 * width), out_channels=num_classes, kernel_size=1, stride=1, padding=0)
)
self.reg_convs.append(nn.Sequential(*[
Conv(in_channels=int(256 * width), out_channels=int(256 * width), ksize=3, stride=1, act=act),
Conv(in_channels=int(256 * width), out_channels=int(256 * width), ksize=3, stride=1, act=act)
]))
self.reg_preds.append(
nn.Conv2d(in_channels=int(256 * width), out_channels=4, kernel_size=1, stride=1, padding=0)
)
self.obj_preds.append(
nn.Conv2d(in_channels=int(256 * width), out_channels=1, kernel_size=1, stride=1, padding=0)
)
def forward(self, inputs):
outputs = []
for k, x in enumerate(inputs):
x = self.stems[k](x)
cls_feat = self.cls_convs[k](x)
cls_output = self.cls_preds[k](cls_feat)
reg_feat = self.reg_convs[k](x)
reg_output = self.reg_preds[k](reg_feat)
obj_output = self.obj_preds[k](reg_feat)
output = torch.cat([reg_output, obj_output, cls_output], 1)
outputs.append(output)
return outputs
class YOLOPAFPN(nn.Module):
def __init__(self, depth=1.0, width=1.0, in_features=("dark3", "dark4", "dark5"), in_channels=[256, 512, 1024],
depthwise=False, act="silu"):
super().__init__()
Conv = DWConv if depthwise else BaseConv
self.backbone = CSPDarknet(depth, width, depthwise=depthwise, act=act)
self.in_features = in_features
self.upsample = nn.Upsample(scale_factor=2, mode="nearest")
# -------------------------------------------#
# 20, 20, 1024 -> 20, 20, 512
# -------------------------------------------#
self.lateral_conv0 = BaseConv(int(in_channels[2] * width), int(in_channels[1] * width), 1, 1, act=act)
# -------------------------------------------#
# 40, 40, 1024 -> 40, 40, 512
# -------------------------------------------#
self.C3_p4 = CSPLayer(
int(2 * in_channels[1] * width),
int(in_channels[1] * width),
round(3 * depth),
False,
depthwise=depthwise,
act=act,
)
# -------------------------------------------#
# 40, 40, 512 -> 40, 40, 256
# -------------------------------------------#
self.reduce_conv1 = BaseConv(int(in_channels[1] * width), int(in_channels[0] * width), 1, 1, act=act)
# -------------------------------------------#
# 80, 80, 512 -> 80, 80, 256
# -------------------------------------------#
self.C3_p3 = CSPLayer(
int(2 * in_channels[0] * width),
int(in_channels[0] * width),
round(3 * depth),
False,
depthwise=depthwise,
act=act,
)
# -------------------------------------------#
# 80, 80, 256 -> 40, 40, 256
# -------------------------------------------#
self.bu_conv2 = Conv(int(in_channels[0] * width), int(in_channels[0] * width), 3, 2, act=act)
# -------------------------------------------#
# 40, 40, 256 -> 40, 40, 512
# -------------------------------------------#
self.C3_n3 = CSPLayer(
int(2 * in_channels[0] * width),
int(in_channels[1] * width),
round(3 * depth),
False,
depthwise=depthwise,
act=act,
)
# -------------------------------------------#
# 40, 40, 512 -> 20, 20, 512
# -------------------------------------------#
self.bu_conv1 = Conv(int(in_channels[1] * width), int(in_channels[1] * width), 3, 2, act=act)
# -------------------------------------------#
# 20, 20, 1024 -> 20, 20, 1024
# -------------------------------------------#
self.C3_n4 = CSPLayer(
int(2 * in_channels[1] * width),
int(in_channels[2] * width),
round(3 * depth),
False,
depthwise=depthwise,
act=act,
)
def forward(self, input):
out_features = self.backbone.forward(input)
[feat1, feat2, feat3] = [out_features[f] for f in self.in_features]
P5 = self.lateral_conv0(feat3)
P5_upsample = self.upsample(P5)
P5_upsample = torch.cat([P5_upsample, feat2], 1)
P5_upsample = self.C3_p4(P5_upsample)
P4 = self.reduce_conv1(P5_upsample)
P4_upsample = self.upsample(P4)
P4_upsample = torch.cat([P4_upsample, feat1], 1)
P3_out = self.C3_p3(P4_upsample)
P3_downsample = self.bu_conv2(P3_out)
P3_downsample = torch.cat([P3_downsample, P4], 1)
P4_out = self.C3_n3(P3_downsample)
P4_downsample = self.bu_conv1(P4_out)
P4_downsample = torch.cat([P4_downsample, P5], 1)
P5_out = self.C3_n4(P4_downsample)
return (P3_out, P4_out, P5_out)
class YoloBody(nn.Module):
def __init__(self, num_classes, phi):
super().__init__()
depth_dict = {'nano': 0.33, 'tiny': 0.33, 's': 0.33, 'm': 0.67, 'l': 1.00, 'x': 1.33, }
width_dict = {'nano': 0.25, 'tiny': 0.375, 's': 0.50, 'm': 0.75, 'l': 1.00, 'x': 1.25, }
depth, width = depth_dict[phi], width_dict[phi]
depthwise = True if phi == 'nano' else False
self.backbone = YOLOPAFPN(depth, width, depthwise=depthwise)
self.head = YOLOXHead(num_classes, width, depthwise=depthwise)
def forward(self, x):
fpn_outs = self.backbone.forward(x)
outputs = self.head.forward(fpn_outs)
return outputs
3.系统可视化
系统使用了PyQt5作为可视化工具,整体效果如下:
系统实现了实时摄像头监测功能和上传视频监测功能,视频流都通过opencv库来读取。读取成功后将每一帧交由YOLOX模型进行检测。
摄像头功能代码如下:
def open_camera_btn(self):
if not self.timer_camera.isActive(): # 定时器未启动
flag = self.cap.open(self.CAM_NUM)
if flag == False:
msg = QtWidgets.QMessageBox.warning(self.window, '警告!', "请检查摄像头是否连接正确",
buttons=QtWidgets.QMessageBox.Ok)
else:
self.timer_camera.start() # 设置30毫秒后,定时器将每隔30毫秒调用timeout函数
self.open_camera.setText('关闭监测')
# 关闭检测按钮事件
else:
self.timer_camera.stop()
self.cap.release()
self.label_show_camera.clear() # 清空视频显示区域
self.open_camera.setText('开始监测')
上传视频监测代码如下:
def video_detect_btn(self):
fileUrl, _ = QFileDialog.getOpenFileName(self, "Open Video File", QDir.currentPath(),
"Video Files (*.mp4 *.avi *.mov *.wmv);;")
# 视频选择成功
if fileUrl:
print(fileUrl)
self.label_video_url.setText(fileUrl)
if not self.timer_camera.isActive(): # 定时器未启动
flag = self.cap.open(fileUrl)
if flag == False:
msg = QtWidgets.QMessageBox.warning(self.window, '警告!', "请检查摄像头是否连接正确",
buttons=QtWidgets.QMessageBox.Ok)
else:
self.timer_camera.start() # 设置30毫秒后,定时器将每隔30毫秒调用timeout函数
点击上传监测视频后,将打开资源文件窗口进行视频文件选择,效果如下:
用户选择完成后,将开始逐帧检测。
整个页面的逻辑代码如下:
import cv2
from PIL import Image
from PyQt5 import uic, QtCore, QtGui, QtWidgets
import numpy as np
from PyQt5.QtCore import QDir
from PyQt5.QtWidgets import QMainWindow, QFileDialog
from utils.utils import get_class_dic
from yolo import YOLO
class Monitor(QMainWindow):
def __init__(self):
super().__init__() # 父类的构造函数
self.CAM_NUM = 0 # 调用摄像头
self.timer_camera = QtCore.QTimer()
self.cap = cv2.VideoCapture() # 视频流
self.set_ui_init()
self.slot_init()
self.yolox = YOLO()
# 布局初始化
def set_ui_init(self):
self.window = uic.loadUi('ui/monitor.ui',self)
self.open_camera = self.window.open_camera
self.label_show_camera = self.window.label_camera
self.all_result = self.window.res_array
self.person_num = self.window.person_label
self.car_num = self.window.car_label
self.light_num = self.window.light_label
self.label_1 = self.window.id_one
self.label_2 = self.window.id_two
self.label_3 = self.window.id_three
self.label_video_url = self.window.label_video_url
# 槽函数初始化
def slot_init(self):
self.open_camera.clicked.connect(self.open_camera_btn)
self.timer_camera.timeout.connect(self.show_camera) # 将timeout绑定槽函数show_camera
self.btn_submit_video.clicked.connect(self.video_detect_btn)
# 显示摄像头
def show_camera(self):
flag, image = self.cap.read()
if not flag:
self.timer_camera.stop()
self.cap.release()
self.label_show_camera.clear() # 清空视频显示区域
else:
show = cv2.resize(image, (620, 500))
show = Image.fromarray(show)
re_show, pre_label = self.yolox.detect_image(show)
res_dict = get_class_dic(pre_label)
self.show_res_num(res_dict)
res_dict.clear()
re_show = np.array(re_show)
re_show = cv2.cvtColor(re_show, cv2.COLOR_BGR2RGB)
showImage = QtGui.QImage(re_show.data, re_show.shape[1], re_show.shape[0],
QtGui.QImage.Format_RGB888)
self.label_show_camera.setPixmap(QtGui.QPixmap.fromImage(showImage)) # 将画面显示在ui控件上
# 摄像头按钮点击事件
def open_camera_btn(self):
if not self.timer_camera.isActive(): # 定时器未启动
flag = self.cap.open(self.CAM_NUM)
if flag == False:
msg = QtWidgets.QMessageBox.warning(self.window, '警告!', "请检查摄像头是否连接正确",
buttons=QtWidgets.QMessageBox.Ok)
else:
self.timer_camera.start() # 设置30毫秒后,定时器将每隔30毫秒调用timeout函数
self.open_camera.setText('关闭监测')
# 关闭检测按钮事件
else:
self.timer_camera.stop()
self.cap.release()
self.label_show_camera.clear() # 清空视频显示区域
self.open_camera.setText('开始监测')
# 视频监测
def video_detect_btn(self):
fileUrl, _ = QFileDialog.getOpenFileName(self, "Open Video File", QDir.currentPath(),
"Video Files (*.mp4 *.avi *.mov *.wmv);;")
# 视频选择成功
if fileUrl:
print(fileUrl)
self.label_video_url.setText(fileUrl)
if not self.timer_camera.isActive(): # 定时器未启动
flag = self.cap.open(fileUrl)
if flag == False:
msg = QtWidgets.QMessageBox.warning(self.window, '警告!', "请检查摄像头是否连接正确",
buttons=QtWidgets.QMessageBox.Ok)
else:
self.timer_camera.start() # 设置30毫秒后,定时器将每隔30毫秒调用timeout函数
def show_res_num(self, dict):
label_list = [*dict]
all = 0
if 'person' in label_list:
self.person_num.setText(str(dict['person']))
all += int(dict['person'])
else:
self.person_num.setText('0')
if 'car' in label_list:
self.car_num.setText(str(dict['car']))
all += int(dict['car'])
else:
self.car_num.setText('0')
if 'bicycle' in label_list:
self.light_num.setText(str(dict['bicycle']))
all += int(dict['bicycle'])
else:
self.light_num.setText('0')
self.all_result.setText(str(all))
4.效果
5.参考
https://blog.csdn.net/weixin_44791964/article/details/120476949