若该文为原创文章,转载请注明原文出处。
一、目的
最近想做基于web端的YOLO识别检测系统,由于个人知识量有限,有很多不懂,特别是VUE这块,所以使用了AI的方式来实现自己的目的。
网上有很多程序是融合了Python、Flask、Vue、SQLite等技术;
检测方式:图片,视频,摄像头,在此基础上,通过AI工具,又增加了RTSP方式。
这里记录下过程及方式,虽然框架不是很好,但实现了功能。
二、准备工作
1、rtsp服务器
下载页面 : https://github.com/aler9/rtsp-simple-server/releases
这里,我们下载 rtsp-simple-server_v0.19.1_windows_amd64.zip
2、ffmpeg推rtsp流
ffmpeg -re -stream_loop -1 -i test.mp4 -c:v h264 -rtsp_transport tcp -f rtsp rtsp://192.168.50.83:8554/video
三、思路流程
1、系统架构
采用 Flask + Vue 前后端分离架构:
- 后端:Flask 提供 RESTful API,处理核心逻辑(YOLO 检测、RTSP 流管理、数据库操作)。
- 前端:Vue 3 负责交互界面(用户登录、文件上传、检测结果展示)。
- 数据库:SQLite 存储用户信息和检测记录。
2、后端
1)RTSP 实时流检测
流程:
- 前端调用
/api/rtsp_start
启动 RTSP 流(传递 RTSP URL)。 - 后端通过 RTSPStreamManager 管理多个 RTSP 线程:
- 每个 RTSP URL 对应一个 RTSPStreamThread 线程。
- 线程持续读取视频帧,调用 YOLO 模型检测,保存结果到共享变量。
- 前端定时调用
/api/rtsp_frame
获取最新帧和检测结果。 - 调用
/api/rtsp_stop
停止流。
- 前端调用
关键类:
- RTSPStreamManager:管理 RTSP 线程的生命周期(启动/停止/清理)。
- RTSPStreamThread:独立线程处理 RTSP 流的帧读取和 YOLO 检测。
2、图片/视频检测
- 图片检测(
/api/detect_image
):- 上传图片 → 调用 YOLO 检测 → 绘制检测框 → 保存结果图片和数据库记录。
- 视频检测(
/api/detect_video
):- 上传视频 → 逐帧检测 → 生成带检测框的视频 → 保存结果和数据库记录。
3、前端
图片检测流程
用户->>前端: 拖拽/选择图片
前端->>前端: 生成Blob URL预览 (beforeImageUpload)
前端->>后端: 上传图片到 /api/detect_image
后端-->>前端: 返回检测结果
前端->>前端: 显示结果图片和检测框数据
视频检测流程
用户->>前端: 拖拽/选择视频
前端->>前端: 生成Blob URL预览 (beforeVideoUpload)
前端->>后端: 上传视频到 /api/detect_video
后端-->>前端: 返回带检测框的视频
前端->>前端: 播放结果视频并显示检测统计
摄像头实时检测
1. 调用 navigator.mediaDevices.getUserMedia 获取摄像头权限
2. 绑定视频流到 <video> 元素
3. 启动定时器 (detectionInterval):
- 捕获帧 -> 发送到 /api/process_frame
- 接收检测结果 -> 通过CSS绝对定位绘制检测框
4. 显示实时FPS计数器
RTSP流检测
1. 验证RTSP地址格式
2. 调用 /api/rtsp_start 启动后端RTSP线程
3. 定时轮询 /api/rtsp_frame:
- 接收JPEG二进制帧 -> Blob URL显示
- 解析响应头中的检测结果(X-Detections)
4. 绘制检测框并显示FPS
四、核心代码
这是只附RTSP部分代码
后端解析RTSP流
# RTSP流管理
class RTSPStreamManager:
def __init__(self):
self.streams = {} # {rtsp_url: {'thread': thread, 'last_active': timestamp}}
self.lock = threading.Lock()
def add_stream(self, rtsp_url, fps=10):
with self.lock:
if rtsp_url in self.streams:
return False
thread = RTSPStreamThread(rtsp_url, fps)
thread.start()
self.streams[rtsp_url] = {
'thread': thread,
'last_active': time.time(),
'fps': fps
}
return True
def get_frame(self, rtsp_url):
with self.lock:
if rtsp_url not in self.streams:
return None, []
print(f"get_frame: {rtsp_url}")
self.streams[rtsp_url]['last_active'] = time.time()
thread = self.streams[rtsp_url]['thread']
# 获取帧数据
frame, detections = thread.thread_get_frame()
print(f"thread.thread_get_frame")
# 保存测试帧用于调试
if frame is not None:
try:
print('保存测试帧')
cv2.imwrite('static/debug_frame.jpg', frame)
print(f"get_frame: 成功保存测试帧到 static/debug_frame.jpg")
except Exception as e:
print(f"get_frame: 保存测试帧失败 - {str(e)}")
print(f"get_frame: 返回帧和检测结果")
return frame, detections
def stop_stream(self, rtsp_url):
with self.lock:
if rtsp_url not in self.streams:
return False
thread = self.streams[rtsp_url]['thread']
thread.stop()
thread.join()
del self.streams[rtsp_url]
return True
def cleanup_inactive(self, timeout=300):
with self.lock:
current_time = time.time()
to_remove = []
for rtsp_url, stream_info in self.streams.items():
if current_time - stream_info['last_active'] > timeout:
to_remove.append(rtsp_url)
for rtsp_url in to_remove:
self.stop_stream(rtsp_url)
return len(to_remove)
# RTSP处理线程
class RTSPStreamThread(threading.Thread):
def __init__(self, rtsp_url, fps=10):
super().__init__()
self.rtsp_url = rtsp_url
self.fps = fps
self.cap = None
self.frame = None
self.detections = []
self.lock = threading.Lock()
self.running = True
self.frame_interval = 1.0 / fps
self.last_frame_time = 0
def run(self):
# 尝试使用FFMPEG后端(如果可用)
backends = [
cv2.CAP_FFMPEG,
cv2.CAP_GSTREAMER,
cv2.CAP_ANY
]
for backend in backends:
try:
self.cap = cv2.VideoCapture(self.rtsp_url, backend)
if self.cap.isOpened():
print(f"✅ 成功打开RTSP流: {self.rtsp_url} (后端: {backend})")
break
except:
pass
print(f"RTSP流: {self.rtsp_url}")
if not self.cap or not self.cap.isOpened():
print(f"❌ 无法打开RTSP流: {self.rtsp_url}")
return
print(f"开始RTSP处理")
while self.running:
current_time = time.time()
if current_time - self.last_frame_time >= self.frame_interval:
ret, frame = self.cap.read()
if not ret:
print(f"RTSP流读取失败: {self.rtsp_url}")
time.sleep(1)
continue
print(f"RTSP流读取成功")
# 保存成图片
cv2.imwrite(f'static/01_test.jpg', frame)
# 检测对象
try:
results = model(frame)
detections = []
for r in results:
boxes = r.boxes
if boxes is not None:
for box in boxes:
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
conf = box.conf[0].cpu().numpy()
cls = box.cls[0].cpu().numpy()
detections.append({
'class': model.names[int(cls)],
'confidence': float(conf),
'bbox': [float(x1), float(y1), float(x2), float(y2)]
})
# 更新帧和检测结果
with self.lock:
print('更新帧和检测结果')
self.frame = frame
self.detections = detections
except Exception as e:
print(f"RTSP检测失败: {e}")
with self.lock:
print('RTSP检测失败')
self.frame = frame
self.detections = []
self.last_frame_time = current_time
cv2.imwrite(f'static/result_test.jpg', frame)
time.sleep(0.5) # 新增延时,控制检测频率
self.cap.release()
def thread_get_frame(self):
with self.lock:
# 输出获取帧的信息
print(f"thread_get_frame")
if self.frame is None:
print(f"thread_get_frame: 没有帧")
return None, []
frame_with_detections = self.frame.copy()
for detection in self.detections:
x1, y1, x2, y2 = detection['bbox']
cv2.rectangle(frame_with_detections, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
label = f"{detection['class']}: {detection['confidence']:.2f}"
cv2.putText(frame_with_detections, label, (int(x1), int(y1)-10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
cv2.imwrite(f'static/thread_get_frame.jpg', frame_with_detections)
print('thread_get_frame 返回帧')
return frame_with_detections, self.detections
def stop(self):
# 停止运行标志设置,将running属性设为False
self.running = False
后端API路由
# API路由
@app.route('/api/rtsp_start', methods=['POST'])
def rtsp_start():
print('rtsp_start')
data = request.get_json()
rtsp_url = data.get('rtsp_url')
fps = data.get('fps', 10)
user_id = data.get('user_id', 1)
print(f"rtsp_start: {rtsp_url}")
print(f"fps: {fps}")
print(f"user_id: {user_id}")
if not rtsp_url:
print('rtsp_start: 缺少RTSP URL')
return jsonify({'success': False, 'message': '缺少RTSP URL'}), 400
try:
# 启动RTSP流
success = rtsp_manager.add_stream(rtsp_url, fps)
if success:
# 记录到数据库
detection = DetectionResult(
user_id=user_id,
detection_type='rtsp',
original_file=rtsp_url,
result_file='', # RTSP没有结果文件
detections='[]',
confidence=0
)
db.session.add(detection)
db.session.commit()
print('RTSP流处理已启动')
return jsonify({
'success': True,
'message': 'RTSP流处理已启动',
'detection_id': detection.id
})
else:
return jsonify({'success': False, 'message': 'RTSP流已在处理中'}), 400
except Exception as e:
print(f"启动RTSP流失败: {str(e)}")
return jsonify({'success': False, 'message': f'启动RTSP流失败: {str(e)}'}), 500
#"GET /api/rtsp_frame?rtsp_url=rtsp://192.168.50.83:8554/video HTTP/1.1" 404 -
@app.route('/api/rtsp_frame', methods=['POST'])
def rtsp_get_frame():
print('rtsp_frame')
data = request.get_json()
rtsp_url = data.get('rtsp_url')
detection_id = data.get('user_id', 1)
print(f"rtsp_get_frame: {rtsp_url}")
print(f'detection_id: {detection_id}')
if not rtsp_url:
print('rtsp_get_frame: 缺少RTSP URL')
return jsonify({'success': False, 'message': '缺少RTSP URL'}), 400
print('获取rtsp_manager的rtsp帧')
try:
print('rtsp_get_frame: 获取RTSP帧1')
frame, detections = rtsp_manager.get_frame(rtsp_url)
save_frame = frame.copy()
print(f'rtsp_get_frame: 获取到的帧={frame is not None}')
print(f'rtsp_get_frame: 获取到的检测结果数量={len(detections)}')
if save_frame is not None:
print('rtsp_get_frame: 获取帧成功')
cv2.imwrite('static/02_test.jpg', save_frame)
print(f'调试: 帧已保存到 02_test.jpg')
with open('static/02_test.jpg', 'rb') as f:
print('本地文件头:', f.read(4).hex(' '))
else:
print('rtsp_get_frame: 获取到的帧为空')
print(f'rtsp_get_frame: 获取到的帧={frame is not None}')
if frame is not None:
print('rtsp_get_frame: 更新数据库记录')
# 更新数据库记录(如果有detection_id)
if detection_id:
detection = DetectionResult.query.get(detection_id)
if detection:
detection.detections = json.dumps(detections)
detection.confidence = max([d['confidence'] for d in detections]) if detections else 0
db.session.commit()
print('rtsp_get_frame: 更新数据库记录成功')
# 将帧转换为JPEG 转换为JPEG二进制
ret, jpeg = cv2.imencode('.jpg', frame)
if not ret:
print('rtsp_get_frame: 无法编码帧 500')
return jsonify({'success': False, 'message': '无法编码帧'}), 500
print("编码完成")
print(f'JPEG二进制数据长度: {len(jpeg.tobytes())} 字节')
print(f'检测结果: {detections}')
# 比较编码前后数据
print('编码后数据头:', jpeg.tobytes()[:4].hex(' '))
# 返回响应
response = make_response(jpeg.tobytes())
response.headers['Content-Type'] = 'image/jpeg'
response.headers['Content-Length'] = str(len(jpeg.tobytes()))
response.headers['X-Detections'] = json.dumps(detections)
response.headers['Access-Control-Expose-Headers'] = 'X-Detections'
return response
else:
print('rtsp_get_frame: 无法获取RTSP帧 500')
return jsonify({'success': False, 'message': '无法获取RTSP帧'}), 500
except Exception as e:
print('rtsp_get_frame: 获取RTSP帧失败 500')
return jsonify({'success': False, 'message': f'获取RTSP帧失败: {str(e)}'}), 500
@app.route('/api/rtsp_stop', methods=['POST'])
def rtsp_stop():
print('rtsp_stop')
data = request.get_json()
rtsp_url = data.get('rtsp_url')
print(rtsp_url)
detection_id = data.get('user_id', 1)
print(detection_id)
if not rtsp_url:
return jsonify({'success': False, 'message': '缺少RTSP URL'}), 400
try:
print('rtsp_stop')
success = rtsp_manager.stop_stream(rtsp_url)
print('rtsp_manager.stop_stream')
if success:
print('rtsp_stop success')
# 更新数据库记录
if detection_id:
detection = DetectionResult.query.get(detection_id)
if detection:
detection.result_file = 'RTSP流已停止'
db.session.commit()
return jsonify({'success': True, 'message': 'RTSP流已停止'})
else:
return jsonify({'success': False, 'message': 'RTSP流未运行'}), 404
except Exception as e:
return jsonify({'success': False, 'message': f'停止RTSP流失败: {str(e)}'}), 500
功能基本实现
如有侵权,或需要完整代码,请及时联系博主。