Files
Test_AI/benchmark/benchmark_runner.py

393 lines
15 KiB
Python
Raw Permalink Normal View History

2026-01-20 10:54:30 +08:00
"""
Benchmark 主控程序模块
"""
import time
import signal
import sys
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from .config import BenchmarkConfig
from .engine_builder import TRTEngineBuilder
from .inference_engine import TRTInferenceEngine
from .decode_thread import FrameQueueManager
from .batch_assembler import BatchAssembler
from .metrics_collector import MetricsCollector
from .results import TestResult, export_json, export_csv, generate_report
from .utils import setup_logging, Timer, get_timestamp, ensure_dir
logger = setup_logging()
class BenchmarkRunner:
"""Benchmark 主控程序"""
def __init__(self, config: BenchmarkConfig):
self.config = config
self._interrupted = False
self._results: List[TestResult] = []
# 组件引用
self._engine_builder: Optional[TRTEngineBuilder] = None
self._engines: Dict[int, str] = {}
self._inference_engine: Optional[TRTInferenceEngine] = None
self._queue_manager: Optional[FrameQueueManager] = None
self._batch_assembler: Optional[BatchAssembler] = None
self._metrics_collector: Optional[MetricsCollector] = None
# 注册信号处理
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
logger.warning("收到中断信号,正在停止...")
self._interrupted = True
def initialize(self):
"""初始化所有组件"""
logger.info("=" * 60)
logger.info("FP16 性能评估 Benchmark 框架")
logger.info("=" * 60)
# 构建 TensorRT Engine
logger.info("构建 TensorRT Engine...")
self._engine_builder = TRTEngineBuilder(
model_path=self.config.model_path,
output_dir=self.config.engine_dir
)
self._engines = self._engine_builder.build_all_engines(
resolutions=self.config.resolutions,
precision=self.config.precision
)
logger.info(f"Engine 构建完成: {list(self._engines.keys())}")
# 初始化指标采集器
self._metrics_collector = MetricsCollector(
device_id=self.config.device_id,
sample_interval_ms=self.config.metrics_sample_interval_ms
)
# 创建输出目录
ensure_dir(self.config.output_dir)
logger.info("初始化完成")
def _setup_test(self, resolution: int, batch_size: int, num_cameras: int, target_fps: float):
"""设置单次测试环境"""
# 加载对应分辨率的 Engine
engine_path = self._engines[resolution]
self._inference_engine = TRTInferenceEngine(
engine_path=engine_path,
num_streams=self.config.num_cuda_streams,
device_id=self.config.device_id
)
# 创建帧队列管理器
self._queue_manager = FrameQueueManager(queue_size=self.config.frame_queue_size)
# 添加视频源
for i in range(num_cameras):
source_id = f"cam_{i:02d}"
if self.config.use_synthetic:
self._queue_manager.add_source(
source_id=source_id,
source="synthetic",
target_fps=target_fps,
source_type="synthetic",
resolution=self.config.synthetic_resolution
)
elif i < len(self.config.video_sources):
src = self.config.video_sources[i]
self._queue_manager.add_source(
source_id=source_id,
source=src.get("url", ""),
target_fps=target_fps,
source_type=src.get("type", "rtsp"),
resolution=self.config.synthetic_resolution
)
else:
# 超出配置的源数量,使用合成源
self._queue_manager.add_source(
source_id=source_id,
source="synthetic",
target_fps=target_fps,
source_type="synthetic",
resolution=self.config.synthetic_resolution
)
# 创建 Batch 组装器
self._batch_assembler = BatchAssembler(
frame_queues=self._queue_manager.queues,
batch_size=batch_size,
imgsz=(resolution, resolution),
use_gpu_preprocess=True,
device_id=self.config.device_id
)
def _teardown_test(self):
"""清理测试环境"""
if self._queue_manager:
self._queue_manager.stop_all()
self._queue_manager = None
if self._inference_engine:
self._inference_engine.cleanup()
self._inference_engine = None
self._batch_assembler = None
# 强制 GC 和显存清理
import gc
import torch
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
# 等待显存释放
time.sleep(1)
def run_single_test(
self,
resolution: int,
batch_size: int,
num_cameras: int,
target_fps: float
) -> TestResult:
"""执行单次测试"""
logger.info(f"测试配置: {resolution}x{resolution}, batch={batch_size}, "
f"cameras={num_cameras}, fps={target_fps}")
# 设置测试环境
self._setup_test(resolution, batch_size, num_cameras, target_fps)
# 启动解码线程
self._queue_manager.start_all()
# 重置指标采集器
self._metrics_collector.reset()
# 预热阶段
logger.info(f"预热 {self.config.warmup_sec} 秒...")
warmup_end = time.time() + self.config.warmup_sec
while time.time() < warmup_end and not self._interrupted:
batch_data = self._batch_assembler.assemble_batch(timeout=0.1)
if batch_data:
batch, _ = batch_data
self._inference_engine.infer_sync(batch)
# 开始正式测试
logger.info(f"正式测试 {self.config.test_duration_sec} 秒...")
self._metrics_collector.start()
test_start = time.time()
test_end = test_start + self.config.test_duration_sec
stream_id = 0
while time.time() < test_end and not self._interrupted:
batch_data = self._batch_assembler.assemble_batch(timeout=0.1)
if batch_data:
batch, frame_infos = batch_data
# 轮询使用不同的 CUDA Stream
task_id = self._inference_engine.infer_async(batch, stream_id)
stream_id = (stream_id + 1) % self.config.num_cuda_streams
# 获取结果
_, latency_ms = self._inference_engine.get_results(task_id)
# 记录指标
self._metrics_collector.record_inference(latency_ms, len(frame_infos))
# 停止采集
self._metrics_collector.stop()
# 冷却阶段
time.sleep(self.config.cooldown_sec)
# 收集结果
result = self._collect_results(resolution, batch_size, num_cameras, target_fps)
# 清理
self._teardown_test()
return result
def _collect_results(
self,
resolution: int,
batch_size: int,
num_cameras: int,
target_fps: float
) -> TestResult:
"""收集测试结果"""
gpu_metrics = self._metrics_collector.get_gpu_metrics()
throughput_metrics = self._metrics_collector.get_throughput_metrics()
# 获取解码统计
decode_stats = self._queue_manager.get_all_stats()
total_dropped = sum(s.dropped_frames for s in decode_stats.values())
# 构建结果
result = TestResult(
resolution=resolution,
batch_size=batch_size,
num_cameras=num_cameras,
target_fps=target_fps,
gpu_utilization_avg=gpu_metrics.get("gpu_utilization", {}).get("avg", 0),
gpu_utilization_max=gpu_metrics.get("gpu_utilization", {}).get("max", 0),
gpu_utilization_min=gpu_metrics.get("gpu_utilization", {}).get("min", 0),
memory_used_mb=gpu_metrics.get("memory_used_mb", {}).get("avg", 0),
memory_utilization=gpu_metrics.get("memory_utilization", {}).get("avg", 0),
total_throughput_fps=throughput_metrics.get("throughput_fps", 0),
per_camera_fps=throughput_metrics.get("throughput_fps", 0) / num_cameras if num_cameras > 0 else 0,
total_frames=throughput_metrics.get("total_frames", 0),
total_batches=throughput_metrics.get("total_batches", 0),
avg_latency_ms=throughput_metrics.get("latency", {}).get("avg", 0),
p95_latency_ms=throughput_metrics.get("latency", {}).get("p95", 0),
p99_latency_ms=throughput_metrics.get("latency", {}).get("p99", 0),
max_latency_ms=throughput_metrics.get("latency", {}).get("max", 0),
min_latency_ms=throughput_metrics.get("latency", {}).get("min", 0),
frame_drop_rate=throughput_metrics.get("frame_drop_rate", 0),
dropped_frames=total_dropped,
test_duration_sec=throughput_metrics.get("duration_sec", 0),
)
# 检查实时性
result.check_realtime_capability()
# 检查 GPU 饱和
result.is_gpu_saturated, result.saturation_reason = self._check_saturation(result)
logger.info(f"结果: 吞吐={result.total_throughput_fps:.1f} FPS, "
f"GPU={result.gpu_utilization_avg:.1f}%, "
f"延迟={result.avg_latency_ms:.2f}ms (P95={result.p95_latency_ms:.2f}ms), "
f"实时={'' if result.is_realtime_capable else ''}, "
f"饱和={'' if result.is_gpu_saturated else ''}")
return result
def _check_saturation(self, result: TestResult) -> Tuple[bool, Optional[str]]:
"""检查 GPU 是否饱和"""
reasons = []
if result.gpu_utilization_avg >= self.config.gpu_saturation_threshold:
reasons.append(f"GPU 利用率 {result.gpu_utilization_avg:.1f}% >= {self.config.gpu_saturation_threshold}%")
if result.memory_utilization >= self.config.memory_saturation_threshold:
reasons.append(f"显存利用率 {result.memory_utilization:.1f}% >= {self.config.memory_saturation_threshold}%")
if not result.is_realtime_capable:
reasons.append(f"P95 延迟 {result.p95_latency_ms:.2f}ms 超过帧间隔")
if result.frame_drop_rate > 5.0:
reasons.append(f"丢帧率 {result.frame_drop_rate:.1f}% > 5%")
if reasons:
return True, "; ".join(reasons)
return False, None
def run_all_tests(self) -> List[TestResult]:
"""执行所有测试组合"""
combinations = self.config.get_test_combinations()
total = len(combinations)
logger.info(f"{total} 个测试组合")
consecutive_failures = 0
max_consecutive_failures = 3
for i, combo in enumerate(combinations, 1):
if self._interrupted:
logger.warning("测试被中断")
break
# 连续失败太多次,跳过剩余高负载测试
if consecutive_failures >= max_consecutive_failures:
logger.warning(f"连续 {consecutive_failures} 次失败,跳过剩余测试")
break
logger.info(f"\n[{i}/{total}] 开始测试...")
try:
result = self.run_single_test(
resolution=combo["resolution"],
batch_size=combo["batch_size"],
num_cameras=combo["camera_count"],
target_fps=combo["target_fps"]
)
self._results.append(result)
consecutive_failures = 0 # 重置失败计数
except Exception as e:
error_msg = str(e).lower()
if "outofmemory" in error_msg or "cuda" in error_msg or "memory" in error_msg:
logger.warning(f"显存不足,跳过此测试: {combo}")
consecutive_failures += 1
# 强制清理
self._teardown_test()
time.sleep(3) # 额外等待显存释放
else:
logger.error(f"测试失败: {e}")
consecutive_failures += 1
self._teardown_test()
return self._results
def save_results(self):
"""保存测试结果"""
if not self._results:
logger.warning("没有测试结果可保存")
return
timestamp = get_timestamp()
if self.config.save_json:
json_path = f"{self.config.output_dir}/results_{timestamp}.json"
export_json(self._results, json_path)
logger.info(f"JSON 结果已保存: {json_path}")
if self.config.save_csv:
csv_path = f"{self.config.output_dir}/results_{timestamp}.csv"
export_csv(self._results, csv_path)
logger.info(f"CSV 结果已保存: {csv_path}")
if self.config.generate_report:
report_path = generate_report(self._results, self.config.output_dir)
logger.info(f"Markdown 报告已生成: {report_path}")
# 生成可视化图表
try:
from .visualizer import generate_visualizations
viz_files = generate_visualizations(self._results, self.config.output_dir)
if viz_files:
logger.info(f"可视化图表已生成: {len(viz_files)} 个文件")
except Exception as e:
logger.warning(f"可视化生成失败: {e}")
def run(self):
"""执行完整 Benchmark 流程"""
try:
self.initialize()
self.run_all_tests()
self.save_results()
logger.info("\n" + "=" * 60)
logger.info("Benchmark 完成!")
logger.info("=" * 60)
except Exception as e:
logger.error(f"Benchmark 执行失败: {e}")
raise
finally:
self._teardown_test()