程序代码篇---Python视频流

发布于:2025-05-11 ⋅ 阅读:(20) ⋅ 点赞:(0)


前言

视频流处理是计算机视觉应用的基础,Python 通过 OpenCV、PyAV、imageio 等库提供了强大的视频处理能力。下面我将简单较为全面介绍 Python 中视频流的相关操作。


一、OpenCV 视频流处理

1. 视频捕获基础

import cv2

# 从摄像头捕获视频流
cap = cv2.VideoCapture(0)  # 0 表示默认摄像头

# 从视频文件捕获
# cap = cv2.VideoCapture('video.mp4')

while True:
    # 读取帧
    ret, frame = cap.read()
    
    if not ret:
        print("无法读取视频流")
        break
    
    # 显示帧
    cv2.imshow('Video Stream', frame)
    
    # 按'q'退出
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# 释放资源
cap.release()
cv2.destroyAllWindows()

2. 视频流属性设置与获取

# 获取视频属性
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

print(f"分辨率: {width}x{height}")
print(f"帧率: {fps}")
print(f"总帧数: {frame_count}")

# 设置视频属性
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
cap.set(cv2.CAP_PROP_FPS, 30)

3. 视频写入

# 定义视频编码器
fourcc = cv2.VideoWriter_fourcc(*'XVID')  # 也可以使用 'MJPG', 'MP4V' 等

# 创建 VideoWriter 对象
out = cv2.VideoWriter('output.avi', fourcc, 20.0, (640, 480))

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    
    # 处理帧 (可选)
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
    # 写入帧
    out.write(frame)
    
    cv2.imshow('frame', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# 释放资源
cap.release()
out.release()
cv2.destroyAllWindows()

二、高级视频流操作

1. 多摄像头处理

# 同时从多个摄像头捕获
cap1 = cv2.VideoCapture(0)  # 主摄像头
cap2 = cv2.VideoCapture(1)  # 外接摄像头

while True:
    ret1, frame1 = cap1.read()
    ret2, frame2 = cap2.read()
    
    if not ret1 or not ret2:
        break
    
    # 水平拼接两个视频流
    combined = cv2.hconcat([frame1, frame2])
    cv2.imshow('Multi Camera', combined)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap1.release()
cap2.release()
cv2.destroyAllWindows()

2. 视频流帧处理

def process_frame(frame):
    """帧处理函数示例"""
    # 转换为灰度图
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    
    # 边缘检测
    edges = cv2.Canny(gray, 100, 200)
    
    # 转换为3通道以便与原图拼接
    edges = cv2.cvtColor(edges, cv2.COLOR_GRAY2BGR)
    
    # 水平拼接原图和边缘图
    processed = cv2.hconcat([frame, edges])
    
    return processed

while True:
    ret, frame = cap.read()
    if not ret:
        break
    
    # 处理帧
    result = process_frame(frame)
    
    cv2.imshow('Processed Video', result)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

3. 视频流分析与统计

import time

frame_count = 0
start_time = time.time()

while True:
    ret, frame = cap.read()
    if not ret:
        break
    
    frame_count += 1
    
    # 计算实时FPS
    elapsed_time = time.time() - start_time
    fps = frame_count / elapsed_time
    
    # 在帧上显示FPS
    cv2.putText(frame, f"FPS: {fps:.2f}", (10, 30), 
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
    
    cv2.imshow('Video with FPS', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

print(f"平均FPS: {fps:.2f}")

三、其他视频处理库

1. PyAV (FFmpeg 的 Python 绑定)

import av

# 打开视频文件
container = av.open('video.mp4')

# 遍历视频帧
for frame in container.decode(video=0):
    # 转换为numpy数组
    img = frame.to_ndarray(format='bgr24')
    
    cv2.imshow('PyAV Video', img)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

container.close()
cv2.destroyAllWindows()

2. imageio

import imageio

# 读取视频
reader = imageio.get_reader('video.mp4')

for i, frame in enumerate(reader):
    cv2.imshow('imageio Video', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

reader.close()
cv2.destroyAllWindows()

# 写入视频
writer = imageio.get_writer('output.mp4', fps=30)

for i in range(100):
    # 生成或获取帧
    frame = generate_frame(i)
    writer.append_data(frame)

writer.close()

四、视频流处理优化技巧

1. 多线程视频处理

from threading import Thread
import queue

class VideoStream:
    def __init__(self, src=0):
        self.stream = cv2.VideoCapture(src)
        self.stopped = False
        self.queue = queue.Queue(maxsize=128)  # 限制队列大小防止内存溢出

    def start(self):
        Thread(target=self.update, args=()).start()
        return self

    def update(self):
        while True:
            if self.stopped:
                return
            
            if not self.queue.full():
                ret, frame = self.stream.read()
                if not ret:
                    self.stop()
                    return
                self.queue.put(frame)

    def read(self):
        return self.queue.get()

    def stop(self):
        self.stopped = True
        self.stream.release()

# 使用
vs = VideoStream(0).start()

while True:
    frame = vs.read()
    cv2.imshow("Threaded Video", frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

vs.stop()
cv2.destroyAllWindows()

2. 视频流分辨率动态调整

def rescale_frame(frame, percent=75):
    """按百分比缩放帧"""
    width = int(frame.shape[1] * percent / 100)
    height = int(frame.shape[0] * percent / 100)
    dim = (width, height)
    return cv2.resize(frame, dim, interpolation=cv2.INTER_AREA)

while True:
    ret, frame = cap.read()
    if not ret:
        break
    
    # 根据处理需求动态调整分辨率
    if perform_heavy_processing:
        frame = rescale_frame(frame, percent=50)
    else:
        frame = rescale_frame(frame, percent=100)
    
    cv2.imshow('Dynamic Scaling', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

3. 硬件加速

# 使用CUDA加速 (需要支持CUDA的OpenCV版本)
def setup_cuda():
    net = cv2.dnn.readNetFromCaffe(protoFile, weightsFile)
    net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
    net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
    return net

# 使用Intel OpenVINO加速
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
cap.set(cv2.CAP_PROP_FPS, 30)

五、特殊视频流处理

1. RTSP 视频流

rtsp_url = "rtsp://username:password@ip_address:port/path"
cap = cv2.VideoCapture(rtsp_url)

# 设置缓冲区大小减少延迟
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)

while True:
    ret, frame = cap.read()
    if not ret:
        print("连接中断,尝试重连...")
        cap.release()
        cap = cv2.VideoCapture(rtsp_url)
        continue
    
    cv2.imshow('RTSP Stream', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

2. 视频流截图与保存

save_count = 0

while True:
    ret, frame = cap.read()
    if not ret:
        break
    
    cv2.imshow('Video', frame)
    
    key = cv2.waitKey(1) & 0xFF
    if key == ord('q'):
        break
    elif key == ord('s'):  # 按's'保存当前帧
        save_count += 1
        filename = f"frame_{save_count}.jpg"
        cv2.imwrite(filename, frame)
        print(f"已保存: {filename}")

3. 视频流叠加信息

from datetime import datetime

while True:
    ret, frame = cap.read()
    if not ret:
        break
    
    # 添加时间戳
    now = datetime.now()
    timestamp = now.strftime("%Y-%m-%d %H:%M:%S")
    cv2.putText(frame, timestamp, (10, 30), 
                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
    
    # 添加边框
    cv2.rectangle(frame, (10, 10), (630, 470), (0, 0, 255), 2)
    
    cv2.imshow('Annotated Video', frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

六、视频流处理常见问题解决

1.视频延迟问题

  1. 减少处理操作或优化代码
  2. 降低分辨率
  3. 使用多线程处理
  4. 设置 cv2.CAP_PROP_BUFFERSIZE 为 1

2.视频无法打开

  1. 检查文件路径或URL是否正确
  2. 检查视频编码格式是否支持
  3. 尝试使用不同的后端(如 cv2.CAP_DSHOW 等)

3.内存泄漏

  1. 确保正确释放资源(release())
  2. 避免在循环中不必要地创建对象
  3. 使用 with 语句管理资源

4.性能瓶颈

  1. 使用性能分析工具(如 cProfile)找出瓶颈
  2. 考虑使用Cython或Numba加速关键部分
  3. 启用硬件加速

5.视频写入问题

  1. 确保帧的大小与VideoWriter指定的尺寸匹配
  2. 检查编码器是否可用
  3. 尝试不同的文件扩展名(如 .avi, .mp4 等)


网站公告

今日签到

点亮在社区的每一天
去签到