一个简单的GPU压力测试脚本-python版

发布于:2025-09-13 ⋅ 阅读:(15) ⋅ 点赞:(0)
import torch
import time
import threading
import argparse
import math
import random

class SimpleGPUStress:
    def __init__(self, gpu_ids=None, target_usage=80, fluctuation=20, 
                 memory_limit=85, matrix_size=2048):
        """
        简洁版GPU压力测试
        
        Args:
            gpu_ids: GPU ID列表,如[0,1]或None(使用所有GPU)
            target_usage: 目标使用率% (默认80%)
            fluctuation: 波动范围% (默认±20%)
            memory_limit: 显存使用上限% (默认85%)
            matrix_size: 基础矩阵大小 (默认2048)
        """
        # 检查CUDA
        if not torch.cuda.is_available():
            raise RuntimeError("CUDA不可用")
        
        # 设置GPU列表
        if gpu_ids is None:
            self.gpu_ids = list(range(torch.cuda.device_count()))
        else:
            self.gpu_ids = gpu_ids
        
        self.target_usage = target_usage / 100.0
        self.fluctuation = fluctuation / 100.0
        self.memory_limit = memory_limit / 100.0
        self.matrix_size = matrix_size
        self.running = True
        
        # 获取每个GPU的显存信息
        self.gpu_memory_info = {}
        for gpu_id in self.gpu_ids:
            total_memory = torch.cuda.get_device_properties(gpu_id).total_memory
            max_memory = total_memory * self.memory_limit
            self.gpu_memory_info[gpu_id] = {
                'total_gb': total_memory / 1024**3,
                'max_gb': max_memory / 1024**3
            }
        
        print(f"使用GPU: {self.gpu_ids}")
        print(f"目标使用率: {target_usage}% ±{fluctuation}%")
        print(f"显存限制: {memory_limit}%")
        for gpu_id in self.gpu_ids:
            info = self.gpu_memory_info[gpu_id]
            print(f"GPU {gpu_id}: 总显存 {info['total_gb']:.1f}GB, 限制 {info['max_gb']:.1f}GB")
    
    def get_current_target(self, start_time):
        """计算当前目标强度(正弦波浮动)"""
        elapsed = time.time() - start_time
        wave = math.sin(elapsed / 30 * 2 * math.pi)  # 30秒周期
        target = self.target_usage + wave * self.fluctuation
        return max(0.1, min(1.0, target))
    
    def manage_memory(self, gpu_id, memory_pool):
        """管理显存使用量"""
        torch.cuda.set_device(gpu_id)
        current_memory = torch.cuda.memory_allocated(gpu_id) / 1024**3
        target_memory = self.gpu_memory_info[gpu_id]['max_gb'] * 0.8  # 目标80%
        
        if current_memory < target_memory and len(memory_pool) < 10:
            # 增加显存使用
            try:
                size = random.randint(1024, 3072)
                matrix = torch.randn(size, size, device=f'cuda:{gpu_id}')
                memory_pool.append(matrix)
            except RuntimeError:
                pass  # 显存不足时忽略
                
        elif current_memory > target_memory and len(memory_pool) > 1:
            # 减少显存使用
            if memory_pool:
                memory_pool.pop()
            torch.cuda.empty_cache()
    
    def worker(self, gpu_id):
        """GPU工作线程"""
        torch.cuda.set_device(gpu_id)
        device = f'cuda:{gpu_id}'
        
        # 创建工作矩阵
        a = torch.randn(self.matrix_size, self.matrix_size, device=device)
        b = torch.randn(self.matrix_size, self.matrix_size, device=device)
        
        # 显存管理池
        memory_pool = []
        
        start_time = time.time()
        iteration = 0
        
        print(f"GPU {gpu_id} 工作线程启动")
        
        while self.running:
            try:
                # 获取当前目标强度
                target_intensity = self.get_current_target(start_time)
                
                # 每100次迭代管理一次显存
                if iteration % 100 == 0:
                    self.manage_memory(gpu_id, memory_pool)
                
                # 根据强度决定是否执行计算
                if random.random() < target_intensity:
                    # 执行矩阵计算
                    c = torch.mm(a, b)
                    
                    # 根据强度调整计算复杂度
                    if target_intensity > 0.7:
                        c = torch.relu(c)
                        c = torch.sigmoid(c)
                        c = torch.tanh(c)
                    elif target_intensity > 0.4:
                        c = torch.relu(c)
                        c = torch.sigmoid(c)
                    else:
                        c = torch.relu(c)
                    
                    torch.cuda.synchronize()
                
                # 动态休眠
                sleep_time = (1 - target_intensity) * 0.01
                if sleep_time > 0:
                    time.sleep(sleep_time)
                
                iteration += 1
                
            except RuntimeError as e:
                if "out of memory" in str(e):
                    # 显存不足时清理
                    memory_pool.clear()
                    torch.cuda.empty_cache()
                    print(f"GPU {gpu_id} 显存不足,已清理")
                else:
                    print(f"GPU {gpu_id} 错误: {e}")
                    break
        
        print(f"GPU {gpu_id} 工作线程退出")
    
    def monitor(self):
        """监控线程"""
        start_time = time.time()
        
        while self.running:
            try:
                elapsed = time.time() - start_time
                target = self.get_current_target(start_time)
                
                print(f"\n时间: {elapsed:.0f}s | 目标强度: {target:.2f}")
                
                for gpu_id in self.gpu_ids:
                    memory_used = torch.cuda.memory_allocated(gpu_id) / 1024**3
                    memory_total = self.gpu_memory_info[gpu_id]['total_gb']
                    memory_percent = (memory_used / memory_total) * 100
                    
                    print(f"GPU {gpu_id}: 显存 {memory_used:.1f}GB/{memory_total:.1f}GB ({memory_percent:.1f}%)")
                
                time.sleep(3)  # 每3秒输出一次
                
            except KeyboardInterrupt:
                break
    
    def start(self):
        """启动压力测试"""
        print("\n开始GPU压力测试,按Ctrl+C停止...\n")
        
        threads = []
        
        try:
            # 启动工作线程
            for gpu_id in self.gpu_ids:
                t = threading.Thread(target=self.worker, args=(gpu_id,))
                t.daemon = True
                threads.append(t)
                t.start()
            
            # 启动监控线程
            monitor_thread = threading.Thread(target=self.monitor)
            monitor_thread.daemon = True
            monitor_thread.start()
            
            # 等待
            while self.running:
                time.sleep(0.1)
                
        except KeyboardInterrupt:
            print("\n正在停止...")
            self.running = False
            
            # 等待线程结束
            for t in threads:
                t.join(timeout=1)
            
            # 清理显存
            for gpu_id in self.gpu_ids:
                torch.cuda.set_device(gpu_id)
                torch.cuda.empty_cache()
            
            print("已停止")

def parse_gpu_ids(gpu_str):
    """解析GPU ID"""
    if gpu_str.lower() == 'all':
        return None
    return [int(x) for x in gpu_str.split(',')]

def main():
    parser = argparse.ArgumentParser(description='简洁版GPU压力测试')
    parser.add_argument('--gpu', '-g', type=parse_gpu_ids, default='0',
                       help='GPU ID,如: 0 或 0,1 或 all')
    parser.add_argument('--target', '-t', type=int, default=80,
                       help='目标使用率% (默认80)')
    parser.add_argument('--fluctuation', '-f', type=int, default=20,
                       help='波动范围% (默认±20)')
    parser.add_argument('--memory-limit', '-m', type=int, default=85,
                       help='显存使用上限% (默认85)')
    parser.add_argument('--matrix-size', '-s', type=int, default=2048 * 2,
                       help='矩阵大小 (默认2048)')
    
    args = parser.parse_args()
    
    try:
        stress_test = SimpleGPUStress(
            gpu_ids=args.gpu,
            target_usage=args.target,
            fluctuation=args.fluctuation,
            memory_limit=args.memory_limit,
            matrix_size=args.matrix_size
        )
        stress_test.start()
        
    except Exception as e:
        print(f"错误: {e}")

if __name__ == "__main__":
    main()


网站公告

今日签到

点亮在社区的每一天
去签到