501 lines
18 KiB
Python
501 lines
18 KiB
Python
"""
|
||
GPU 压力测试模块 - 测试 RTX 3050 极限性能
|
||
目标:
|
||
1. 测试不同分辨率下最大每秒处理帧数
|
||
2. 测试最大接入摄像头路数
|
||
3. 测试不同摄像头数量下单路最大帧数
|
||
4. 测试不同抽帧率下最大摄像头数
|
||
"""
|
||
|
||
import os
|
||
import gc
|
||
import json
|
||
import time
|
||
import signal
|
||
from typing import Dict, List, Optional, Tuple
|
||
from dataclasses import dataclass, asdict
|
||
from datetime import datetime
|
||
|
||
from .utils import setup_logging, ensure_dir
|
||
|
||
logger = setup_logging()
|
||
|
||
|
||
@dataclass
|
||
class StressTestResult:
|
||
"""压力测试结果"""
|
||
test_type: str # max_fps, max_cameras, per_camera_fps, frame_skip
|
||
resolution: int
|
||
batch_size: int
|
||
num_cameras: int
|
||
target_fps: float
|
||
frame_skip: int # 抽帧间隔 (1=不抽帧, 2=每2帧取1帧, etc.)
|
||
|
||
# 结果
|
||
actual_fps: float
|
||
per_camera_fps: float
|
||
gpu_utilization: float
|
||
memory_used_mb: float
|
||
avg_latency_ms: float
|
||
p95_latency_ms: float
|
||
is_stable: bool # 是否稳定运行
|
||
error_msg: Optional[str] = None
|
||
|
||
timestamp: str = ""
|
||
|
||
def __post_init__(self):
|
||
if not self.timestamp:
|
||
self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
|
||
class StressTestRunner:
|
||
"""压力测试运行器"""
|
||
|
||
def __init__(self, model_path: str, output_dir: str = "./stress_results"):
|
||
self.model_path = model_path
|
||
self.output_dir = output_dir
|
||
self.results: List[StressTestResult] = []
|
||
self._interrupted = False
|
||
self._engine_cache: Dict[int, str] = {}
|
||
|
||
# 测试参数
|
||
self.test_duration = 15 # 每次测试秒数
|
||
self.warmup_sec = 2
|
||
self.cooldown_sec = 2
|
||
|
||
ensure_dir(output_dir)
|
||
self._results_file = f"{output_dir}/stress_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
||
|
||
signal.signal(signal.SIGINT, self._signal_handler)
|
||
|
||
def _signal_handler(self, signum, frame):
|
||
logger.warning("收到中断信号,保存当前结果...")
|
||
self._interrupted = True
|
||
self._save_results()
|
||
|
||
def _clear_gpu(self):
|
||
"""清理 GPU 显存"""
|
||
gc.collect()
|
||
try:
|
||
import torch
|
||
if torch.cuda.is_available():
|
||
torch.cuda.empty_cache()
|
||
torch.cuda.synchronize()
|
||
except:
|
||
pass
|
||
time.sleep(self.cooldown_sec)
|
||
|
||
def _save_results(self):
|
||
"""保存结果到文件"""
|
||
data = [asdict(r) for r in self.results]
|
||
with open(self._results_file, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
logger.info(f"结果已保存: {self._results_file}")
|
||
|
||
def _build_engine(self, resolution: int) -> str:
|
||
"""构建或获取缓存的 engine"""
|
||
if resolution in self._engine_cache:
|
||
return self._engine_cache[resolution]
|
||
|
||
from .engine_builder import TRTEngineBuilder
|
||
|
||
builder = TRTEngineBuilder(self.model_path, "./engines")
|
||
engines = builder.build_all_engines([resolution], "fp16")
|
||
|
||
self._engine_cache[resolution] = engines[resolution]
|
||
return engines[resolution]
|
||
|
||
def _run_single_test(
|
||
self,
|
||
resolution: int,
|
||
batch_size: int,
|
||
num_cameras: int,
|
||
target_fps: float,
|
||
frame_skip: int = 1
|
||
) -> Optional[StressTestResult]:
|
||
"""执行单次测试"""
|
||
from .inference_engine import TRTInferenceEngine
|
||
from .decode_thread import FrameQueueManager
|
||
from .batch_assembler import BatchAssembler
|
||
from .metrics_collector import MetricsCollector
|
||
|
||
engine = None
|
||
queue_manager = None
|
||
metrics = None
|
||
|
||
try:
|
||
# 获取 engine
|
||
engine_path = self._build_engine(resolution)
|
||
|
||
logger.info(f"测试: {resolution}x{resolution}, batch={batch_size}, "
|
||
f"cameras={num_cameras}, fps={target_fps}, skip={frame_skip}")
|
||
|
||
# 初始化组件
|
||
engine = TRTInferenceEngine(engine_path, num_streams=1, device_id=0)
|
||
queue_manager = FrameQueueManager(queue_size=2)
|
||
metrics = MetricsCollector(device_id=0, sample_interval_ms=100)
|
||
|
||
# 添加摄像头源 (使用合成数据)
|
||
effective_fps = target_fps / frame_skip # 实际采集帧率
|
||
for i in range(num_cameras):
|
||
queue_manager.add_source(
|
||
source_id=f"cam_{i:02d}",
|
||
source="synthetic",
|
||
target_fps=effective_fps,
|
||
source_type="synthetic",
|
||
resolution=(640, 480)
|
||
)
|
||
|
||
# Batch 组装器
|
||
batch_assembler = BatchAssembler(
|
||
frame_queues=queue_manager.queues,
|
||
batch_size=batch_size,
|
||
imgsz=(resolution, resolution),
|
||
use_gpu_preprocess=True,
|
||
device_id=0
|
||
)
|
||
|
||
# 启动
|
||
queue_manager.start_all()
|
||
|
||
# 预热
|
||
warmup_end = time.time() + self.warmup_sec
|
||
while time.time() < warmup_end and not self._interrupted:
|
||
batch_data = batch_assembler.assemble_batch(timeout=0.1)
|
||
if batch_data:
|
||
batch, _ = batch_data
|
||
engine.infer_sync(batch)
|
||
|
||
# 正式测试
|
||
metrics.reset()
|
||
metrics.start()
|
||
|
||
test_end = time.time() + self.test_duration
|
||
while time.time() < test_end and not self._interrupted:
|
||
batch_data = batch_assembler.assemble_batch(timeout=0.1)
|
||
if batch_data:
|
||
batch, frame_infos = batch_data
|
||
_, latency_ms = engine.infer_sync(batch)
|
||
metrics.record_inference(latency_ms, len(frame_infos))
|
||
|
||
metrics.stop()
|
||
|
||
# 收集结果
|
||
gpu_metrics = metrics.get_gpu_metrics()
|
||
throughput = metrics.get_throughput_metrics()
|
||
|
||
actual_fps = throughput.get("throughput_fps", 0)
|
||
|
||
result = StressTestResult(
|
||
test_type="stress",
|
||
resolution=resolution,
|
||
batch_size=batch_size,
|
||
num_cameras=num_cameras,
|
||
target_fps=target_fps,
|
||
frame_skip=frame_skip,
|
||
actual_fps=actual_fps,
|
||
per_camera_fps=actual_fps / num_cameras if num_cameras > 0 else 0,
|
||
gpu_utilization=gpu_metrics.get("gpu_utilization", {}).get("avg", 0),
|
||
memory_used_mb=gpu_metrics.get("memory_used_mb", {}).get("avg", 0),
|
||
avg_latency_ms=throughput.get("latency", {}).get("avg", 0),
|
||
p95_latency_ms=throughput.get("latency", {}).get("p95", 0),
|
||
is_stable=True
|
||
)
|
||
|
||
logger.info(f" 结果: {actual_fps:.1f} FPS, GPU {result.gpu_utilization:.1f}%, "
|
||
f"延迟 {result.avg_latency_ms:.1f}ms")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
logger.warning(f" 测试失败: {error_msg}")
|
||
|
||
return StressTestResult(
|
||
test_type="stress",
|
||
resolution=resolution,
|
||
batch_size=batch_size,
|
||
num_cameras=num_cameras,
|
||
target_fps=target_fps,
|
||
frame_skip=frame_skip,
|
||
actual_fps=0,
|
||
per_camera_fps=0,
|
||
gpu_utilization=0,
|
||
memory_used_mb=0,
|
||
avg_latency_ms=0,
|
||
p95_latency_ms=0,
|
||
is_stable=False,
|
||
error_msg=error_msg[:200]
|
||
)
|
||
finally:
|
||
# 清理
|
||
if queue_manager:
|
||
queue_manager.stop_all()
|
||
if engine:
|
||
engine.cleanup()
|
||
self._clear_gpu()
|
||
|
||
def test_max_fps(self, resolutions: List[int] = [320, 480]) -> Dict[int, float]:
|
||
"""测试1: 不同分辨率下最大每秒处理帧数"""
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info("测试1: 最大处理帧数 (单摄像头, 最大帧率)")
|
||
logger.info("=" * 60)
|
||
|
||
max_fps_results = {}
|
||
|
||
for res in resolutions:
|
||
if self._interrupted:
|
||
break
|
||
|
||
# 使用大 batch 和高帧率测试极限
|
||
for batch_size in [1, 4, 8]:
|
||
result = self._run_single_test(
|
||
resolution=res,
|
||
batch_size=batch_size,
|
||
num_cameras=1,
|
||
target_fps=100, # 高帧率压力测试
|
||
frame_skip=1
|
||
)
|
||
|
||
if result and result.is_stable:
|
||
self.results.append(result)
|
||
if res not in max_fps_results or result.actual_fps > max_fps_results[res]:
|
||
max_fps_results[res] = result.actual_fps
|
||
|
||
self._save_results()
|
||
|
||
logger.info(f"\n最大 FPS 结果: {max_fps_results}")
|
||
return max_fps_results
|
||
|
||
def test_max_cameras(self, resolutions: List[int] = [320, 480]) -> Dict[int, int]:
|
||
"""测试2: 不同分辨率下最大接入摄像头数"""
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info("测试2: 最大摄像头接入数 (10 FPS 实时)")
|
||
logger.info("=" * 60)
|
||
|
||
max_cameras = {}
|
||
camera_counts = [1, 3, 5, 10, 15, 20, 25, 30]
|
||
|
||
for res in resolutions:
|
||
if self._interrupted:
|
||
break
|
||
|
||
max_cameras[res] = 0
|
||
|
||
for num_cams in camera_counts:
|
||
if self._interrupted:
|
||
break
|
||
|
||
result = self._run_single_test(
|
||
resolution=res,
|
||
batch_size=min(num_cams, 8),
|
||
num_cameras=num_cams,
|
||
target_fps=10,
|
||
frame_skip=1
|
||
)
|
||
|
||
if result:
|
||
self.results.append(result)
|
||
self._save_results()
|
||
|
||
# 检查是否能稳定处理 (实际 FPS >= 目标 FPS * 90%)
|
||
target_total = num_cams * 10
|
||
if result.is_stable and result.actual_fps >= target_total * 0.9:
|
||
max_cameras[res] = num_cams
|
||
else:
|
||
# 无法处理更多摄像头
|
||
logger.info(f" {res}x{res}: 最大 {max_cameras[res]} 路摄像头")
|
||
break
|
||
|
||
logger.info(f"\n最大摄像头数: {max_cameras}")
|
||
return max_cameras
|
||
|
||
def test_per_camera_fps(
|
||
self,
|
||
resolutions: List[int] = [320, 480],
|
||
camera_counts: List[int] = [1, 3, 5, 10, 15, 30]
|
||
) -> Dict[Tuple[int, int], float]:
|
||
"""测试3: 不同摄像头数量下单路最大帧数"""
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info("测试3: 不同摄像头数量下单路最大帧数")
|
||
logger.info("=" * 60)
|
||
|
||
per_camera_results = {}
|
||
|
||
for res in resolutions:
|
||
if self._interrupted:
|
||
break
|
||
|
||
for num_cams in camera_counts:
|
||
if self._interrupted:
|
||
break
|
||
|
||
# 测试高帧率下的实际处理能力
|
||
result = self._run_single_test(
|
||
resolution=res,
|
||
batch_size=min(num_cams, 8),
|
||
num_cameras=num_cams,
|
||
target_fps=30, # 目标 30 FPS
|
||
frame_skip=1
|
||
)
|
||
|
||
if result:
|
||
self.results.append(result)
|
||
per_camera_results[(res, num_cams)] = result.per_camera_fps
|
||
self._save_results()
|
||
|
||
logger.info(f"\n单路最大帧数结果:")
|
||
for (res, cams), fps in per_camera_results.items():
|
||
logger.info(f" {res}x{res}, {cams}路: {fps:.1f} FPS/路")
|
||
|
||
return per_camera_results
|
||
|
||
def test_frame_skip_capacity(
|
||
self,
|
||
resolutions: List[int] = [320, 480],
|
||
frame_skips: List[int] = [1, 2, 3, 5, 10]
|
||
) -> Dict[Tuple[int, int], int]:
|
||
"""测试4: 不同抽帧率下最大摄像头数"""
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info("测试4: 不同抽帧率下最大摄像头数")
|
||
logger.info("=" * 60)
|
||
|
||
frame_skip_results = {}
|
||
camera_counts = [5, 10, 15, 20, 25, 30, 40, 50]
|
||
|
||
for res in resolutions:
|
||
if self._interrupted:
|
||
break
|
||
|
||
for skip in frame_skips:
|
||
if self._interrupted:
|
||
break
|
||
|
||
max_cams = 0
|
||
|
||
for num_cams in camera_counts:
|
||
if self._interrupted:
|
||
break
|
||
|
||
# 原始帧率 30 FPS,抽帧后实际处理帧率 = 30/skip
|
||
effective_fps = 30.0 / skip
|
||
|
||
result = self._run_single_test(
|
||
resolution=res,
|
||
batch_size=min(num_cams, 8),
|
||
num_cameras=num_cams,
|
||
target_fps=30, # 源帧率
|
||
frame_skip=skip
|
||
)
|
||
|
||
if result:
|
||
self.results.append(result)
|
||
self._save_results()
|
||
|
||
# 检查是否稳定
|
||
target_total = num_cams * effective_fps
|
||
if result.is_stable and result.actual_fps >= target_total * 0.85:
|
||
max_cams = num_cams
|
||
else:
|
||
break
|
||
|
||
frame_skip_results[(res, skip)] = max_cams
|
||
logger.info(f" {res}x{res}, 抽帧{skip}: 最大 {max_cams} 路")
|
||
|
||
return frame_skip_results
|
||
|
||
def run_all_tests(self):
|
||
"""运行所有压力测试"""
|
||
logger.info("=" * 60)
|
||
logger.info("RTX 3050 GPU 压力测试")
|
||
logger.info("=" * 60)
|
||
|
||
resolutions = [320, 480]
|
||
|
||
# 测试1: 最大 FPS
|
||
max_fps = self.test_max_fps(resolutions)
|
||
|
||
# 测试2: 最大摄像头数
|
||
max_cameras = self.test_max_cameras(resolutions)
|
||
|
||
# 测试3: 单路最大帧数
|
||
per_camera = self.test_per_camera_fps(resolutions, [1, 3, 5, 10, 15, 30])
|
||
|
||
# 测试4: 抽帧容量
|
||
frame_skip = self.test_frame_skip_capacity(resolutions, [1, 2, 3, 5, 10])
|
||
|
||
# 生成报告
|
||
self._generate_report(max_fps, max_cameras, per_camera, frame_skip)
|
||
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info("压力测试完成!")
|
||
logger.info(f"结果保存在: {self.output_dir}")
|
||
logger.info("=" * 60)
|
||
|
||
def _generate_report(self, max_fps, max_cameras, per_camera, frame_skip):
|
||
"""生成测试报告"""
|
||
report_path = f"{self.output_dir}/stress_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
|
||
|
||
lines = [
|
||
"# RTX 3050 GPU 压力测试报告",
|
||
f"\n生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
|
||
"\n## 1. 最大处理帧数 (单摄像头)",
|
||
"\n| 分辨率 | 最大 FPS |",
|
||
"|--------|----------|",
|
||
]
|
||
|
||
for res, fps in max_fps.items():
|
||
lines.append(f"| {res}×{res} | {fps:.1f} |")
|
||
|
||
lines.extend([
|
||
"\n## 2. 最大摄像头接入数 (10 FPS 实时)",
|
||
"\n| 分辨率 | 最大摄像头数 |",
|
||
"|--------|--------------|",
|
||
])
|
||
|
||
for res, cams in max_cameras.items():
|
||
lines.append(f"| {res}×{res} | {cams} |")
|
||
|
||
lines.extend([
|
||
"\n## 3. 不同摄像头数量下单路最大帧数",
|
||
"\n| 分辨率 | 摄像头数 | 单路 FPS |",
|
||
"|--------|----------|----------|",
|
||
])
|
||
|
||
for (res, cams), fps in per_camera.items():
|
||
lines.append(f"| {res}×{res} | {cams} | {fps:.1f} |")
|
||
|
||
lines.extend([
|
||
"\n## 4. 不同抽帧率下最大摄像头数",
|
||
"\n| 分辨率 | 抽帧间隔 | 实际帧率 | 最大摄像头数 |",
|
||
"|--------|----------|----------|--------------|",
|
||
])
|
||
|
||
for (res, skip), cams in frame_skip.items():
|
||
effective_fps = 30.0 / skip
|
||
lines.append(f"| {res}×{res} | 每{skip}帧取1帧 | {effective_fps:.1f} FPS | {cams} |")
|
||
|
||
lines.extend([
|
||
"\n## 5. 部署建议",
|
||
"\n根据测试结果,推荐配置:",
|
||
"\n| 场景 | 分辨率 | 摄像头数 | 抽帧 | 说明 |",
|
||
"|------|--------|----------|------|------|",
|
||
])
|
||
|
||
# 根据结果生成建议
|
||
if 320 in max_cameras:
|
||
lines.append(f"| 高并发 | 320×320 | {max_cameras.get(320, 10)} | 1 | 最大并发 |")
|
||
if 480 in max_cameras:
|
||
lines.append(f"| 高精度 | 480×480 | {max_cameras.get(480, 5)} | 1 | 精度优先 |")
|
||
|
||
with open(report_path, 'w', encoding='utf-8') as f:
|
||
f.write('\n'.join(lines))
|
||
|
||
logger.info(f"报告已生成: {report_path}")
|
||
|
||
|
||
def run_stress_test(model_path: str, output_dir: str = "./stress_results"):
|
||
"""运行压力测试的入口函数"""
|
||
runner = StressTestRunner(model_path, output_dir)
|
||
runner.run_all_tests()
|