PiscCode轨迹跟踪Mediapipe + OpenCV进阶:速度估算

发布于:2025-09-06 ⋅ 阅读:(20) ⋅ 点赞:(0)

在智能视频分析和计算机视觉领域,目标检测与目标跟踪是核心技术模块。传统的目标检测只能告诉我们“哪里有物体”,而跟踪技术能够在视频流中持续维护物体的身份信息。当我们进一步希望分析目标的运动状态时,比如速度和轨迹,便需要在跟踪的基础上引入运动估计。

本文结合 Mediapipe 的轻量级检测模型与 OpenCV 的图像处理能力,详细介绍一个支持实时速度计算的检测+跟踪系统——FrameObjectTrackerWithSpeed,并分享其设计思路、实现方法和优化方向。


一、技术背景

1.1 目标检测与跟踪

  • 目标检测(Object Detection):在单帧图像中识别物体,返回边界框和类别信息。常用模型有 YOLO、SSD、Faster R-CNN 等。

  • 目标跟踪(Object Tracking):在视频中对物体进行持续跟踪,为同一物体分配唯一 ID,使其身份一致。常用方法有 KCF、SORT、DeepSORT 等。

结合检测与跟踪,可以实现:

  • 统计物体经过某一区域的次数;

  • 分析物体运动轨迹与速度;

  • 在监控或交通场景中进行行为分析。

1.2 为什么选择 Mediapipe

Mediapipe 是 Google 开源的跨平台机器学习框架,优点:

  1. 轻量高效:EfficientDet Lite 等模型在移动端和桌面端均可实时运行。

  2. 易用接口:Python API 封装简单,快速上手。

  3. 跨平台部署:支持 Android、iOS、Web、Python。

  4. GPU/NNAPI 优化:可充分利用硬件加速。

在本文中,我们使用 efficientdet_lite0.tflite 模型进行目标检测,兼顾速度与准确率。

1.3 为什么使用 OpenCV

OpenCV 在视频处理与可视化方面功能强大,主要作用:

  • 读取和处理视频帧;

  • 绘制检测框、文字、轨迹线;

  • 图像后处理和调试。

因此,Mediapipe + OpenCV 是轻量级实时视频分析的理想组合。


二、整体架构设计

新的 FrameObjectTrackerWithSpeed 系统由三个核心模块组成:

  1. 目标检测器(Detector)
    Mediapipe ObjectDetector 接收视频帧,输出检测框与类别。

  2. 轨迹管理器(Tracker)

    • 使用 IoU 将检测框与已有轨迹匹配;

    • 新目标创建新轨迹;

    • 丢失目标在超过阈值后删除;

    • 保存历史轨迹点。

  3. 速度计算模块(Speed Estimator)

    • 根据轨迹点位置和时间间隔计算速度;

    • 支持像素→实际距离转换(pixel_to_meter 参数);

    • 将速度显示在图像上。

整体流程如下:

视频帧 → Mediapipe检测 → 轨迹匹配/更新 → 速度计算 → 可视化输出

三、核心代码解析

3.1 初始化检测器

base_options = python.BaseOptions(model_asset_path=model_path)
options = vision.ObjectDetectorOptions(
    base_options=base_options,
    score_threshold=score_threshold
)
self.detector = vision.ObjectDetector.create_from_options(options)
  • 指定 tflite 模型路径;

  • 设置检测置信度阈值;

  • Mediapipe 内部会对输入图像做格式转换和推理。


3.2 Track 类设计(带速度计算)

class Track:
    def __init__(self, tid, bbox, category, trace_len, pixel_to_meter):
        self.id = tid
        self.bbox = bbox
        self.category = category
        self.lost = 0
        self.trace = []
        self.trace_len = trace_len
        self.pixel_to_meter = pixel_to_meter
        self.last_time = time.time()
        self.speed = 0.0

每个轨迹对象维护:

  • ID、类别、边界框;

  • 丢失计数 lost

  • 历史轨迹 trace

  • 速度 speed

  • 上一次更新时间 last_time

更新方法:

def update(self, bbox):
    self.bbox = bbox
    cx = (bbox[0]+bbox[2])//2
    cy = (bbox[1]+bbox[3])//2
    current_time = time.time()

    if self.trace:
        prev_cx, prev_cy = self.trace[-1]
        dt = current_time - self.last_time
        if dt > 0:
            dx = (cx - prev_cx) * self.pixel_to_meter
            dy = (cy - prev_cy) * self.pixel_to_meter
            distance = (dx**2 + dy**2)**0.5
            self.speed = distance / dt  # m/s

    self.last_time = current_time
    self.trace.append((cx, cy))
    if len(self.trace) > self.trace_len:
        self.trace.pop(0)

特点:

  • 中心点坐标变化用于估算移动距离;

  • 使用时间差计算瞬时速度;

  • 支持像素到米的转换,方便真实场景分析。


3.3 IoU 匹配逻辑

@staticmethod
def _iou(boxA, boxB):
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])
    interArea = max(0, xB - xA) * max(0, yB - yA)
    boxAArea = (boxA[2]-boxA[0])*(boxA[3]-boxA[1])
    boxBArea = (boxB[2]-boxB[0])*(boxB[3]-boxB[1])
    return interArea / float(boxAArea + boxBArea - interArea + 1e-6)
  • 用交并比判断检测框与轨迹匹配程度;

  • IoU 大于阈值认为是同一个目标。


3.4 更新轨迹

def _update_tracks(self, detections):
    updated_tracks = []
    used_dets = set()

    for track in self.tracks:
        best_iou, best_det = 0, None
        for i, det in enumerate(detections):
            if i in used_dets: continue
            bbox = det.bounding_box
            box_det = (bbox.origin_x, bbox.origin_y, bbox.origin_x+bbox.width, bbox.origin_y+bbox.height)
            iou_score = self._iou(track.bbox, box_det)
            if iou_score > best_iou:
                best_iou, best_det = iou_score, (i, det)

        if best_det and best_iou > self.iou_threshold:
            i, det = best_det
            used_dets.add(i)
            bbox = det.bounding_box
            box_det = (bbox.origin_x, bbox.origin_y, bbox.origin_x+bbox.width, bbox.origin_y+bbox.height)
            track.update(box_det)
            track.lost = 0
            updated_tracks.append(track)
        else:
            track.lost += 1
            if track.lost <= self.max_lost:
                updated_tracks.append(track)

    # 新增未匹配检测
    for i, det in enumerate(detections):
        if i in used_dets: continue
        bbox = det.bounding_box
        box_det = (bbox.origin_x, bbox.origin_y, bbox.origin_x+bbox.width, bbox.origin_y+bbox.height)
        category = det.categories[0].category_name if det.categories else "obj"
        new_track = self.Track(self.next_id, box_det, category, self.trace_len, self.pixel_to_meter)
        new_track.update(box_det)
        self.next_id += 1
        updated_tracks.append(new_track)

    self.tracks = updated_tracks
  • 匹配已有轨迹并更新;

  • 未匹配的检测新建轨迹;

  • 丢失目标计数超过阈值后删除。


3.5 可视化与速度显示

def _draw_tracks(self, frame):
    annotated = frame.copy()
    for t in self.tracks:
        x1, y1, x2, y2 = t.bbox
        color = self._get_color(f"{t.category}_{t.id}")
        cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 3)
        label = f"ID {t.id} {t.category} {t.speed:.2f} m/s"
        ...
        cv2.putText(annotated, label, (text_x, text_y), font, font_scale, color, thickness, cv2.LINE_AA)
        if len(t.trace) > 1:
            for i in range(1, len(t.trace)):
                cv2.line(annotated, t.trace[i-1], t.trace[i], color, 2)
    return annotated
  • 绘制矩形框和轨迹折线;

  • 显示 ID、类别与速度;

  • 文字居中于框体,视觉效果清晰。


3.6 帧处理调用

cap = cv2.VideoCapture(0)
tracker = FrameObjectTrackerWithSpeed(pixel_to_meter=0.01)

while cap.isOpened():
    ret, frame = cap.read()
    if not ret: break
    annotated = tracker.do(frame, device="cpu")
    cv2.imshow("Tracking + Speed", annotated)
    if cv2.waitKey(1) & 0xFF == 27: break
cap.release()
cv2.destroyAllWindows()
  • 使用摄像头捕获视频;

  • 每帧调用 do() 方法进行检测、跟踪和速度计算;

  • 实时显示。


四、应用场景

  1. 交通监控

    • 车辆检测、跟踪与速度统计;

    • 超速警告或流量分析。

  2. 智能安防

    • 人体检测与轨迹分析;

    • 可实现异常行为告警。

  3. 体育分析

    • 球员运动轨迹和速度;

    • 训练与战术分析。

  4. 机器人视觉

    • 目标跟随;

    • 速度感知与导航。


五、优化与扩展

import cv2

import mediapipe as mp

import random

from mediapipe.tasks import python

from mediapipe.tasks.python import vision

import time





class FrameObjectTrackerWithSpeed:

    """

    Mediapipe 检测 + 简易轨迹跟踪 + 速度显示

    """



    def __init__(self, model_path="文件地址/efficientdet_lite0.tflite",

                 score_threshold=0.5, max_lost=5, iou_threshold=0.3,

                 trace_len=30, pixel_to_meter=0.01):

        """

        :param model_path: mediapipe tflite 模型路径

        :param score_threshold: 检测分数阈值

        :param max_lost: 目标丢失多少帧后删除

        :param iou_threshold: IOU 阈值用于匹配

        :param trace_len: 保存的轨迹长度

        :param pixel_to_meter: 像素到米的比例

        """

        # 初始化 mediapipe detector

        base_options = python.BaseOptions(model_asset_path=model_path)

        options = vision.ObjectDetectorOptions(

            base_options=base_options,

            score_threshold=score_threshold

        )

        self.detector = vision.ObjectDetector.create_from_options(options)



        # tracker 参数

        self.category_colors = {}

        self.next_id = 0

        self.tracks = []

        self.max_lost = max_lost

        self.iou_threshold = iou_threshold

        self.trace_len = trace_len

        self.pixel_to_meter = pixel_to_meter



    class Track:

        """轨迹对象"""



        def __init__(self, tid, bbox, category, trace_len, pixel_to_meter):

            self.id = tid

            self.bbox = bbox  # (x1, y1, x2, y2)

            self.category = category

            self.lost = 0

            self.trace = []  # 历史轨迹点

            self.trace_len = trace_len

            self.pixel_to_meter = pixel_to_meter

            self.last_time = time.time()

            self.speed = 0.0  # m/s



        def update(self, bbox):

            self.bbox = bbox

            cx = (bbox[0] + bbox[2]) // 2

            cy = (bbox[1] + bbox[3]) // 2

            current_time = time.time()

            # 计算速度

            if self.trace:

                prev_cx, prev_cy = self.trace[-1]

                dt = current_time - self.last_time

                if dt > 0:

                    dx = (cx - prev_cx) * self.pixel_to_meter

                    dy = (cy - prev_cy) * self.pixel_to_meter

                    distance = (dx ** 2 + dy ** 2) ** 0.5

                    self.speed = distance / dt  # m/s

            self.last_time = current_time

            self.trace.append((cx, cy))

            if len(self.trace) > self.trace_len:

                self.trace.pop(0)



    @staticmethod

    def _iou(boxA, boxB):

        """计算两个框的 IOU"""

        xA = max(boxA[0], boxB[0])

        yA = max(boxA[1], boxB[1])

        xB = min(boxA[2], boxB[2])

        yB = min(boxA[3], boxB[3])

        interArea = max(0, xB - xA) * max(0, yB - yA)

        boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])

        boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])

        return interArea / float(boxAArea + boxBArea - interArea + 1e-6)



    def _get_color(self, unique_name: str):

        """

        为每个目标分配一个随机满饱和度颜色

        满饱和度颜色:RGB 中至少一个通道为 255,其余随机

        """

        if unique_name not in self.category_colors:

            channels = [0, 0, 0]

            max_channel = random.randint(0, 2)

            channels[max_channel] = 255

            for i in range(3):

                if i != max_channel:

                    channels[i] = random.randint(0, 255)

            self.category_colors[unique_name] = tuple(channels)

        return self.category_colors[unique_name]



    def _update_tracks(self, detections):

        updated_tracks = []

        used_dets = set()



        # 尝试匹配已有轨迹

        for track in self.tracks:

            best_iou, best_det = 0, None

            for i, det in enumerate(detections):

                if i in used_dets:

                    continue

                bbox = det.bounding_box

                box_det = (bbox.origin_x, bbox.origin_y,

                           bbox.origin_x + bbox.width,

                           bbox.origin_y + bbox.height)

                iou_score = self._iou(track.bbox, box_det)

                if iou_score > best_iou:

                    best_iou, best_det = iou_score, (i, det)



            if best_det and best_iou > self.iou_threshold:

                i, det = best_det

                used_dets.add(i)

                bbox = det.bounding_box

                box_det = (bbox.origin_x, bbox.origin_y,

                           bbox.origin_x + bbox.width,

                           bbox.origin_y + bbox.height)

                track.update(box_det)

                track.lost = 0

                updated_tracks.append(track)

            else:

                track.lost += 1

                if track.lost <= self.max_lost:

                    updated_tracks.append(track)



        # 新增未匹配的检测

        for i, det in enumerate(detections):

            if i in used_dets:

                continue

            bbox = det.bounding_box

            box_det = (bbox.origin_x, bbox.origin_y,

                       bbox.origin_x + bbox.width,

                       bbox.origin_y + bbox.height)

            category = det.categories[0].category_name if det.categories else "obj"

            unique_name = f"{category}_{self.next_id}"

            new_track = self.Track(self.next_id, box_det, category, self.trace_len, self.pixel_to_meter)

            new_track.update(box_det)

            self.next_id += 1

            updated_tracks.append(new_track)



        self.tracks = updated_tracks



    def _draw_tracks(self, frame):

        annotated = frame.copy()

        for t in self.tracks:

            x1, y1, x2, y2 = t.bbox

            color = self._get_color(f"{t.category}_{t.id}")  # 每个目标唯一颜色

            cv2.rectangle(annotated, (x1, y1), (x2, y2), color, 3)  # 较粗框线



            label = f"ID {t.id} {t.category} {t.speed:.2f} m/s"

            font = cv2.FONT_HERSHEY_SIMPLEX

            font_scale = 2.0

            thickness = 3

            (text_width, text_height), baseline = cv2.getTextSize(label, font, font_scale, thickness)



            # 文字水平居中对齐到框体中心

            cx = (x1 + x2) // 2

            cy = (y1 + y2) // 2

            text_x = cx - text_width // 2

            text_y = cy + text_height // 2  # 垂直方向稍微向下一点,让文字居中



            cv2.putText(annotated, label, (text_x, text_y),

                        font, font_scale, color, thickness, cv2.LINE_AA)



            # 画轨迹

            if len(t.trace) > 1:

                for i in range(1, len(t.trace)):

                    cv2.line(annotated, t.trace[i - 1], t.trace[i], color, 2)

        return annotated



    def do(self, frame,device):

        """处理一帧,返回带检测框、轨迹和速度的图像"""

        if frame is None:

            return None

        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB,

                            data=cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))

        detection_result = self.detector.detect(mp_image)

        detections = detection_result.detections or []

        self._update_tracks(detections)

        return self._draw_tracks(frame)

  1. 跟踪优化

    • 引入 Kalman 滤波预测位置;

    • 使用匈牙利算法做全局匹配;

    • 增加外观特征(ReID)避免遮挡导致 ID 混淆。

  2. 速度计算优化

    • 采用移动平均平滑瞬时速度;

    • 支持多相机标定实现真实世界速度估计。

  3. 性能优化

    • GPU 或 OpenVINO 加速;

    • 使用更轻量模型(如 EfficientDet Lite2/3)提升帧率。

  4. 功能扩展

    • 区域统计与事件触发;

    • 数据记录与可视化分析。


六、总结

本文介绍了一个基于 Mediapipe + OpenCV 的实时目标检测与轨迹跟踪系统,并加入了速度计算功能。核心思路:

  1. Mediapipe 高效检测目标;

  2. IoU 匹配实现简易跟踪;

  3. 轨迹点记录并计算速度;

  4. 可视化显示框、轨迹和速度。

该系统轻量、可实时运行,适合视频分析、交通监控、体育分析、机器人视觉等场景,并具备进一步优化空间,如使用 Kalman 滤波、多目标匹配或更高精度模型。

对 PiscTrace or PiscCode感兴趣?更多精彩内容请移步官网看看~🔗 PiscTrace


网站公告

今日签到

点亮在社区的每一天
去签到