AI学习笔记三十:基于yolov8的web显示

发布于:2025-07-04 ⋅ 阅读:(16) ⋅ 点赞:(0)

若该文为原创文章,转载请注明原文出处。

一、目的

最近想做基于web端的YOLO识别检测系统,由于个人知识量有限,有很多不懂,特别是VUE这块,所以使用了AI的方式来实现自己的目的。

网上有很多程序是融合了Python、Flask、Vue、SQLite等技术;

检测方式:图片,视频,摄像头,在此基础上,通过AI工具,又增加了RTSP方式。

这里记录下过程及方式,虽然框架不是很好,但实现了功能。

二、准备工作

1、rtsp服务器

下载页面 : https://github.com/aler9/rtsp-simple-server/releases
这里,我们下载 rtsp-simple-server_v0.19.1_windows_amd64.zip

2、ffmpeg推rtsp流

ffmpeg -re -stream_loop -1 -i test.mp4 -c:v h264 -rtsp_transport tcp -f rtsp rtsp://192.168.50.83:8554/video

三、思路流程

1、系统架构

采用 Flask + Vue 前后端分离架构:

  • 后端:Flask 提供 RESTful API,处理核心逻辑(YOLO 检测、RTSP 流管理、数据库操作)。
  • 前端:Vue 3 负责交互界面(用户登录、文件上传、检测结果展示)。
  • 数据库:SQLite 存储用户信息和检测记录。

2、后端

1)RTSP 实时流检测

  • 流程

    1. 前端调用 /api/rtsp_start 启动 RTSP 流(传递 RTSP URL)。
    2. 后端通过 RTSPStreamManager 管理多个 RTSP 线程:
      • 每个 RTSP URL 对应一个 RTSPStreamThread 线程。
      • 线程持续读取视频帧,调用 YOLO 模型检测,保存结果到共享变量。
    3. 前端定时调用 /api/rtsp_frame 获取最新帧和检测结果。
    4. 调用 /api/rtsp_stop 停止流。
  • 关键类

    • RTSPStreamManager:管理 RTSP 线程的生命周期(启动/停止/清理)。
    • RTSPStreamThread:独立线程处理 RTSP 流的帧读取和 YOLO 检测。

2、图片/视频检测

  • 图片检测/api/detect_image):
    1. 上传图片 → 调用 YOLO 检测 → 绘制检测框 → 保存结果图片和数据库记录。
  • 视频检测/api/detect_video):
    1. 上传视频 → 逐帧检测 → 生成带检测框的视频 → 保存结果和数据库记录。

3、前端

图片检测流程
用户->>前端: 拖拽/选择图片
前端->>前端: 生成Blob URL预览 (beforeImageUpload)
前端->>后端: 上传图片到 /api/detect_image
后端-->>前端: 返回检测结果
前端->>前端: 显示结果图片和检测框数据
视频检测流程
用户->>前端: 拖拽/选择视频
前端->>前端: 生成Blob URL预览 (beforeVideoUpload)
前端->>后端: 上传视频到 /api/detect_video
后端-->>前端: 返回带检测框的视频
前端->>前端: 播放结果视频并显示检测统计
摄像头实时检测
1. 调用 navigator.mediaDevices.getUserMedia 获取摄像头权限
2. 绑定视频流到 <video> 元素
3. 启动定时器 (detectionInterval):
   - 捕获帧 -> 发送到 /api/process_frame
   - 接收检测结果 -> 通过CSS绝对定位绘制检测框
4. 显示实时FPS计数器
RTSP流检测
1. 验证RTSP地址格式
2. 调用 /api/rtsp_start 启动后端RTSP线程
3. 定时轮询 /api/rtsp_frame:
   - 接收JPEG二进制帧 -> Blob URL显示
   - 解析响应头中的检测结果(X-Detections)
4. 绘制检测框并显示FPS

四、核心代码

这是只附RTSP部分代码

后端解析RTSP流


# RTSP流管理
class RTSPStreamManager:
    def __init__(self):
        self.streams = {}  # {rtsp_url: {'thread': thread, 'last_active': timestamp}}
        self.lock = threading.Lock()
    
    def add_stream(self, rtsp_url, fps=10):
        with self.lock:
            if rtsp_url in self.streams:
                return False
            
            thread = RTSPStreamThread(rtsp_url, fps)
            thread.start()
            self.streams[rtsp_url] = {
                'thread': thread,
                'last_active': time.time(),
                'fps': fps
            }
            return True
    
    def get_frame(self, rtsp_url):
        with self.lock:
            if rtsp_url not in self.streams:
                return None, []
            
            print(f"get_frame: {rtsp_url}") 
            self.streams[rtsp_url]['last_active'] = time.time()
            thread = self.streams[rtsp_url]['thread']
 
            # 获取帧数据
            frame, detections = thread.thread_get_frame()
            print(f"thread.thread_get_frame") 

            # 保存测试帧用于调试
            if frame is not None:
                try:
                    print('保存测试帧')
                    cv2.imwrite('static/debug_frame.jpg', frame)
                    print(f"get_frame: 成功保存测试帧到 static/debug_frame.jpg")
                except Exception as e:
                    print(f"get_frame: 保存测试帧失败 - {str(e)}")


            print(f"get_frame: 返回帧和检测结果")
            return frame, detections
    
    def stop_stream(self, rtsp_url):
        with self.lock:
            if rtsp_url not in self.streams:
                return False
            
            thread = self.streams[rtsp_url]['thread']
            thread.stop()
            thread.join()
            del self.streams[rtsp_url]
            return True
    
    def cleanup_inactive(self, timeout=300):
        with self.lock:
            current_time = time.time()
            to_remove = []
            
            for rtsp_url, stream_info in self.streams.items():
                if current_time - stream_info['last_active'] > timeout:
                    to_remove.append(rtsp_url)
            
            for rtsp_url in to_remove:
                self.stop_stream(rtsp_url)
            
            return len(to_remove)

# RTSP处理线程
class RTSPStreamThread(threading.Thread):
    def __init__(self, rtsp_url, fps=10):
        super().__init__()
        self.rtsp_url = rtsp_url
        self.fps = fps
        self.cap = None
        self.frame = None
        self.detections = []
        self.lock = threading.Lock()
        self.running = True
        self.frame_interval = 1.0 / fps
        self.last_frame_time = 0
    
    def run(self):
        # 尝试使用FFMPEG后端(如果可用)
        backends = [
            cv2.CAP_FFMPEG,
            cv2.CAP_GSTREAMER,
            cv2.CAP_ANY
        ]
        
        for backend in backends:
            try:
                self.cap = cv2.VideoCapture(self.rtsp_url, backend)
                if self.cap.isOpened():
                    print(f"✅ 成功打开RTSP流: {self.rtsp_url} (后端: {backend})")
                    break
            except:
                pass
        
        print(f"RTSP流: {self.rtsp_url}")
        if not self.cap or not self.cap.isOpened():
            print(f"❌ 无法打开RTSP流: {self.rtsp_url}")
            return
        
        print(f"开始RTSP处理")
        while self.running:
            current_time = time.time()
            if current_time - self.last_frame_time >= self.frame_interval:
                ret, frame = self.cap.read()
                if not ret:
                    print(f"RTSP流读取失败: {self.rtsp_url}")
                    time.sleep(1)
                    continue
                
                print(f"RTSP流读取成功")
                # 保存成图片 
                cv2.imwrite(f'static/01_test.jpg', frame)   

                # 检测对象
                try:
                    results = model(frame)
                    detections = []
                    
                    for r in results:
                        boxes = r.boxes
                        if boxes is not None:
                            for box in boxes:
                                x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                                conf = box.conf[0].cpu().numpy()
                                cls = box.cls[0].cpu().numpy()
                                
                                detections.append({
                                    'class': model.names[int(cls)],
                                    'confidence': float(conf),
                                    'bbox': [float(x1), float(y1), float(x2), float(y2)]
                                })
                    
                    # 更新帧和检测结果
                    with self.lock:
                        print('更新帧和检测结果')
                        self.frame = frame
                        self.detections = detections
                
                except Exception as e:
                    print(f"RTSP检测失败: {e}")
                    with self.lock:
                        print('RTSP检测失败')
                        self.frame = frame
                        self.detections = []
                
                self.last_frame_time = current_time
                cv2.imwrite(f'static/result_test.jpg', frame)

                time.sleep(0.5)  # 新增延时,控制检测频率
        
        self.cap.release()
    
    def thread_get_frame(self):
        with self.lock:
            # 输出获取帧的信息
            print(f"thread_get_frame")
            if self.frame is None:
                print(f"thread_get_frame: 没有帧")  
                return None, []
              
            frame_with_detections = self.frame.copy()
            for detection in self.detections:
                x1, y1, x2, y2 = detection['bbox']
                cv2.rectangle(frame_with_detections, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
                label = f"{detection['class']}: {detection['confidence']:.2f}"
                cv2.putText(frame_with_detections, label, (int(x1), int(y1)-10), 
                          cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
            
            cv2.imwrite(f'static/thread_get_frame.jpg', frame_with_detections)
            print('thread_get_frame 返回帧')
            return frame_with_detections, self.detections
    
    def stop(self):
            # 停止运行标志设置,将running属性设为False
        self.running = False

后端API路由

# API路由
@app.route('/api/rtsp_start', methods=['POST'])
def rtsp_start():
    print('rtsp_start')
    data = request.get_json()
    rtsp_url = data.get('rtsp_url')
    fps = data.get('fps', 10)
    user_id = data.get('user_id', 1)
    print(f"rtsp_start: {rtsp_url}")
    print(f"fps: {fps}")
    print(f"user_id: {user_id}")
    
    if not rtsp_url:
        print('rtsp_start: 缺少RTSP URL')
        return jsonify({'success': False, 'message': '缺少RTSP URL'}), 400
    
    try:
        # 启动RTSP流
        success = rtsp_manager.add_stream(rtsp_url, fps)
        
        if success:
            # 记录到数据库
            detection = DetectionResult(
                user_id=user_id,
                detection_type='rtsp',
                original_file=rtsp_url,
                result_file='',  # RTSP没有结果文件
                detections='[]',
                confidence=0
            )
            db.session.add(detection)
            db.session.commit()
            
            print('RTSP流处理已启动')
            return jsonify({
                'success': True,
                'message': 'RTSP流处理已启动',
                'detection_id': detection.id
            })
        else:
            return jsonify({'success': False, 'message': 'RTSP流已在处理中'}), 400
    
    except Exception as e:
        print(f"启动RTSP流失败: {str(e)}")
        return jsonify({'success': False, 'message': f'启动RTSP流失败: {str(e)}'}), 500

#"GET /api/rtsp_frame?rtsp_url=rtsp://192.168.50.83:8554/video HTTP/1.1" 404 -
@app.route('/api/rtsp_frame', methods=['POST'])
def rtsp_get_frame():
    print('rtsp_frame')
    data = request.get_json()
    rtsp_url = data.get('rtsp_url')
    detection_id = data.get('user_id', 1)

    print(f"rtsp_get_frame: {rtsp_url}")
    print(f'detection_id: {detection_id}')
    
    if not rtsp_url:
        print('rtsp_get_frame: 缺少RTSP URL')
        return jsonify({'success': False, 'message': '缺少RTSP URL'}), 400
    
    print('获取rtsp_manager的rtsp帧')
    try:
        print('rtsp_get_frame: 获取RTSP帧1')
        frame, detections = rtsp_manager.get_frame(rtsp_url)
        save_frame = frame.copy()
        print(f'rtsp_get_frame: 获取到的帧={frame is not None}')
        print(f'rtsp_get_frame: 获取到的检测结果数量={len(detections)}')
     
        if save_frame is not None:
            print('rtsp_get_frame: 获取帧成功')
            cv2.imwrite('static/02_test.jpg', save_frame)
            print(f'调试: 帧已保存到 02_test.jpg')
            with open('static/02_test.jpg', 'rb') as f:
                print('本地文件头:', f.read(4).hex(' '))
        else: 
            print('rtsp_get_frame: 获取到的帧为空')    

        print(f'rtsp_get_frame: 获取到的帧={frame is not None}')
        if frame is not None:        
            print('rtsp_get_frame: 更新数据库记录')

            # 更新数据库记录(如果有detection_id)
            if detection_id:
                detection = DetectionResult.query.get(detection_id)
                if detection:
                    detection.detections = json.dumps(detections)
                    detection.confidence = max([d['confidence'] for d in detections]) if detections else 0
                    db.session.commit()
            
            print('rtsp_get_frame: 更新数据库记录成功')
            # 将帧转换为JPEG 转换为JPEG二进制
            ret, jpeg = cv2.imencode('.jpg', frame)
            if not ret:
                print('rtsp_get_frame: 无法编码帧 500')
                return jsonify({'success': False, 'message': '无法编码帧'}), 500
            
            print("编码完成")
            print(f'JPEG二进制数据长度: {len(jpeg.tobytes())} 字节')
            print(f'检测结果: {detections}')

            # 比较编码前后数据
            print('编码后数据头:', jpeg.tobytes()[:4].hex(' '))

            # 返回响应
            response = make_response(jpeg.tobytes())
            response.headers['Content-Type'] = 'image/jpeg'
            response.headers['Content-Length'] = str(len(jpeg.tobytes()))
            response.headers['X-Detections'] = json.dumps(detections)
            response.headers['Access-Control-Expose-Headers'] = 'X-Detections'
            return response
        else:
            print('rtsp_get_frame: 无法获取RTSP帧 500')
            return jsonify({'success': False, 'message': '无法获取RTSP帧'}), 500
    except Exception as e:
        print('rtsp_get_frame: 获取RTSP帧失败 500')
        return jsonify({'success': False, 'message': f'获取RTSP帧失败: {str(e)}'}), 500

@app.route('/api/rtsp_stop', methods=['POST'])
def rtsp_stop():
    print('rtsp_stop')
    data = request.get_json()
    rtsp_url = data.get('rtsp_url')
    print(rtsp_url)
    detection_id = data.get('user_id', 1)
    print(detection_id)
    
    if not rtsp_url:
        return jsonify({'success': False, 'message': '缺少RTSP URL'}), 400
    
    try:
        print('rtsp_stop')
        success = rtsp_manager.stop_stream(rtsp_url)
        print('rtsp_manager.stop_stream')
        
        if success:
            print('rtsp_stop success')
            # 更新数据库记录
            if detection_id:
                detection = DetectionResult.query.get(detection_id)
                if detection:
                    detection.result_file = 'RTSP流已停止'
                    db.session.commit()
            
            return jsonify({'success': True, 'message': 'RTSP流已停止'})
        else:
            return jsonify({'success': False, 'message': 'RTSP流未运行'}), 404
    
    except Exception as e:
        return jsonify({'success': False, 'message': f'停止RTSP流失败: {str(e)}'}), 500

功能基本实现

如有侵权,或需要完整代码,请及时联系博主。


网站公告

今日签到

点亮在社区的每一天
去签到