import torch
import time
import threading
import argparse
import math
import random
class SimpleGPUStress:
def __init__(self, gpu_ids=None, target_usage=80, fluctuation=20,
memory_limit=85, matrix_size=2048):
"""
简洁版GPU压力测试
Args:
gpu_ids: GPU ID列表,如[0,1]或None(使用所有GPU)
target_usage: 目标使用率% (默认80%)
fluctuation: 波动范围% (默认±20%)
memory_limit: 显存使用上限% (默认85%)
matrix_size: 基础矩阵大小 (默认2048)
"""
if not torch.cuda.is_available():
raise RuntimeError("CUDA不可用")
if gpu_ids is None:
self.gpu_ids = list(range(torch.cuda.device_count()))
else:
self.gpu_ids = gpu_ids
self.target_usage = target_usage / 100.0
self.fluctuation = fluctuation / 100.0
self.memory_limit = memory_limit / 100.0
self.matrix_size = matrix_size
self.running = True
self.gpu_memory_info = {}
for gpu_id in self.gpu_ids:
total_memory = torch.cuda.get_device_properties(gpu_id).total_memory
max_memory = total_memory * self.memory_limit
self.gpu_memory_info[gpu_id] = {
'total_gb': total_memory / 1024**3,
'max_gb': max_memory / 1024**3
}
print(f"使用GPU: {self.gpu_ids}")
print(f"目标使用率: {target_usage}% ±{fluctuation}%")
print(f"显存限制: {memory_limit}%")
for gpu_id in self.gpu_ids:
info = self.gpu_memory_info[gpu_id]
print(f"GPU {gpu_id}: 总显存 {info['total_gb']:.1f}GB, 限制 {info['max_gb']:.1f}GB")
def get_current_target(self, start_time):
"""计算当前目标强度(正弦波浮动)"""
elapsed = time.time() - start_time
wave = math.sin(elapsed / 30 * 2 * math.pi)
target = self.target_usage + wave * self.fluctuation
return max(0.1, min(1.0, target))
def manage_memory(self, gpu_id, memory_pool):
"""管理显存使用量"""
torch.cuda.set_device(gpu_id)
current_memory = torch.cuda.memory_allocated(gpu_id) / 1024**3
target_memory = self.gpu_memory_info[gpu_id]['max_gb'] * 0.8
if current_memory < target_memory and len(memory_pool) < 10:
try:
size = random.randint(1024, 3072)
matrix = torch.randn(size, size, device=f'cuda:{gpu_id}')
memory_pool.append(matrix)
except RuntimeError:
pass
elif current_memory > target_memory and len(memory_pool) > 1:
if memory_pool:
memory_pool.pop()
torch.cuda.empty_cache()
def worker(self, gpu_id):
"""GPU工作线程"""
torch.cuda.set_device(gpu_id)
device = f'cuda:{gpu_id}'
a = torch.randn(self.matrix_size, self.matrix_size, device=device)
b = torch.randn(self.matrix_size, self.matrix_size, device=device)
memory_pool = []
start_time = time.time()
iteration = 0
print(f"GPU {gpu_id} 工作线程启动")
while self.running:
try:
target_intensity = self.get_current_target(start_time)
if iteration % 100 == 0:
self.manage_memory(gpu_id, memory_pool)
if random.random() < target_intensity:
c = torch.mm(a, b)
if target_intensity > 0.7:
c = torch.relu(c)
c = torch.sigmoid(c)
c = torch.tanh(c)
elif target_intensity > 0.4:
c = torch.relu(c)
c = torch.sigmoid(c)
else:
c = torch.relu(c)
torch.cuda.synchronize()
sleep_time = (1 - target_intensity) * 0.01
if sleep_time > 0:
time.sleep(sleep_time)
iteration += 1
except RuntimeError as e:
if "out of memory" in str(e):
memory_pool.clear()
torch.cuda.empty_cache()
print(f"GPU {gpu_id} 显存不足,已清理")
else:
print(f"GPU {gpu_id} 错误: {e}")
break
print(f"GPU {gpu_id} 工作线程退出")
def monitor(self):
"""监控线程"""
start_time = time.time()
while self.running:
try:
elapsed = time.time() - start_time
target = self.get_current_target(start_time)
print(f"\n时间: {elapsed:.0f}s | 目标强度: {target:.2f}")
for gpu_id in self.gpu_ids:
memory_used = torch.cuda.memory_allocated(gpu_id) / 1024**3
memory_total = self.gpu_memory_info[gpu_id]['total_gb']
memory_percent = (memory_used / memory_total) * 100
print(f"GPU {gpu_id}: 显存 {memory_used:.1f}GB/{memory_total:.1f}GB ({memory_percent:.1f}%)")
time.sleep(3)
except KeyboardInterrupt:
break
def start(self):
"""启动压力测试"""
print("\n开始GPU压力测试,按Ctrl+C停止...\n")
threads = []
try:
for gpu_id in self.gpu_ids:
t = threading.Thread(target=self.worker, args=(gpu_id,))
t.daemon = True
threads.append(t)
t.start()
monitor_thread = threading.Thread(target=self.monitor)
monitor_thread.daemon = True
monitor_thread.start()
while self.running:
time.sleep(0.1)
except KeyboardInterrupt:
print("\n正在停止...")
self.running = False
for t in threads:
t.join(timeout=1)
for gpu_id in self.gpu_ids:
torch.cuda.set_device(gpu_id)
torch.cuda.empty_cache()
print("已停止")
def parse_gpu_ids(gpu_str):
"""解析GPU ID"""
if gpu_str.lower() == 'all':
return None
return [int(x) for x in gpu_str.split(',')]
def main():
parser = argparse.ArgumentParser(description='简洁版GPU压力测试')
parser.add_argument('--gpu', '-g', type=parse_gpu_ids, default='0',
help='GPU ID,如: 0 或 0,1 或 all')
parser.add_argument('--target', '-t', type=int, default=80,
help='目标使用率% (默认80)')
parser.add_argument('--fluctuation', '-f', type=int, default=20,
help='波动范围% (默认±20)')
parser.add_argument('--memory-limit', '-m', type=int, default=85,
help='显存使用上限% (默认85)')
parser.add_argument('--matrix-size', '-s', type=int, default=2048 * 2,
help='矩阵大小 (默认2048)')
args = parser.parse_args()
try:
stress_test = SimpleGPUStress(
gpu_ids=args.gpu,
target_usage=args.target,
fluctuation=args.fluctuation,
memory_limit=args.memory_limit,
matrix_size=args.matrix_size
)
stress_test.start()
except Exception as e:
print(f"错误: {e}")
if __name__ == "__main__":
main()