""" Benchmark 主控程序模块 """ import time import signal import sys from typing import Dict, List, Any, Optional, Tuple from dataclasses import dataclass from .config import BenchmarkConfig from .engine_builder import TRTEngineBuilder from .inference_engine import TRTInferenceEngine from .decode_thread import FrameQueueManager from .batch_assembler import BatchAssembler from .metrics_collector import MetricsCollector from .results import TestResult, export_json, export_csv, generate_report from .utils import setup_logging, Timer, get_timestamp, ensure_dir logger = setup_logging() class BenchmarkRunner: """Benchmark 主控程序""" def __init__(self, config: BenchmarkConfig): self.config = config self._interrupted = False self._results: List[TestResult] = [] # 组件引用 self._engine_builder: Optional[TRTEngineBuilder] = None self._engines: Dict[int, str] = {} self._inference_engine: Optional[TRTInferenceEngine] = None self._queue_manager: Optional[FrameQueueManager] = None self._batch_assembler: Optional[BatchAssembler] = None self._metrics_collector: Optional[MetricsCollector] = None # 注册信号处理 signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def _signal_handler(self, signum, frame): logger.warning("收到中断信号,正在停止...") self._interrupted = True def initialize(self): """初始化所有组件""" logger.info("=" * 60) logger.info("FP16 性能评估 Benchmark 框架") logger.info("=" * 60) # 构建 TensorRT Engine logger.info("构建 TensorRT Engine...") self._engine_builder = TRTEngineBuilder( model_path=self.config.model_path, output_dir=self.config.engine_dir ) self._engines = self._engine_builder.build_all_engines( resolutions=self.config.resolutions, precision=self.config.precision ) logger.info(f"Engine 构建完成: {list(self._engines.keys())}") # 初始化指标采集器 self._metrics_collector = MetricsCollector( device_id=self.config.device_id, sample_interval_ms=self.config.metrics_sample_interval_ms ) # 创建输出目录 ensure_dir(self.config.output_dir) logger.info("初始化完成") def _setup_test(self, resolution: int, batch_size: int, num_cameras: int, target_fps: float): """设置单次测试环境""" # 加载对应分辨率的 Engine engine_path = self._engines[resolution] self._inference_engine = TRTInferenceEngine( engine_path=engine_path, num_streams=self.config.num_cuda_streams, device_id=self.config.device_id ) # 创建帧队列管理器 self._queue_manager = FrameQueueManager(queue_size=self.config.frame_queue_size) # 添加视频源 for i in range(num_cameras): source_id = f"cam_{i:02d}" if self.config.use_synthetic: self._queue_manager.add_source( source_id=source_id, source="synthetic", target_fps=target_fps, source_type="synthetic", resolution=self.config.synthetic_resolution ) elif i < len(self.config.video_sources): src = self.config.video_sources[i] self._queue_manager.add_source( source_id=source_id, source=src.get("url", ""), target_fps=target_fps, source_type=src.get("type", "rtsp"), resolution=self.config.synthetic_resolution ) else: # 超出配置的源数量,使用合成源 self._queue_manager.add_source( source_id=source_id, source="synthetic", target_fps=target_fps, source_type="synthetic", resolution=self.config.synthetic_resolution ) # 创建 Batch 组装器 self._batch_assembler = BatchAssembler( frame_queues=self._queue_manager.queues, batch_size=batch_size, imgsz=(resolution, resolution), use_gpu_preprocess=True, device_id=self.config.device_id ) def _teardown_test(self): """清理测试环境""" if self._queue_manager: self._queue_manager.stop_all() self._queue_manager = None if self._inference_engine: self._inference_engine.cleanup() self._inference_engine = None self._batch_assembler = None # 强制 GC 和显存清理 import gc import torch gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.synchronize() # 等待显存释放 time.sleep(1) def run_single_test( self, resolution: int, batch_size: int, num_cameras: int, target_fps: float ) -> TestResult: """执行单次测试""" logger.info(f"测试配置: {resolution}x{resolution}, batch={batch_size}, " f"cameras={num_cameras}, fps={target_fps}") # 设置测试环境 self._setup_test(resolution, batch_size, num_cameras, target_fps) # 启动解码线程 self._queue_manager.start_all() # 重置指标采集器 self._metrics_collector.reset() # 预热阶段 logger.info(f"预热 {self.config.warmup_sec} 秒...") warmup_end = time.time() + self.config.warmup_sec while time.time() < warmup_end and not self._interrupted: batch_data = self._batch_assembler.assemble_batch(timeout=0.1) if batch_data: batch, _ = batch_data self._inference_engine.infer_sync(batch) # 开始正式测试 logger.info(f"正式测试 {self.config.test_duration_sec} 秒...") self._metrics_collector.start() test_start = time.time() test_end = test_start + self.config.test_duration_sec stream_id = 0 while time.time() < test_end and not self._interrupted: batch_data = self._batch_assembler.assemble_batch(timeout=0.1) if batch_data: batch, frame_infos = batch_data # 轮询使用不同的 CUDA Stream task_id = self._inference_engine.infer_async(batch, stream_id) stream_id = (stream_id + 1) % self.config.num_cuda_streams # 获取结果 _, latency_ms = self._inference_engine.get_results(task_id) # 记录指标 self._metrics_collector.record_inference(latency_ms, len(frame_infos)) # 停止采集 self._metrics_collector.stop() # 冷却阶段 time.sleep(self.config.cooldown_sec) # 收集结果 result = self._collect_results(resolution, batch_size, num_cameras, target_fps) # 清理 self._teardown_test() return result def _collect_results( self, resolution: int, batch_size: int, num_cameras: int, target_fps: float ) -> TestResult: """收集测试结果""" gpu_metrics = self._metrics_collector.get_gpu_metrics() throughput_metrics = self._metrics_collector.get_throughput_metrics() # 获取解码统计 decode_stats = self._queue_manager.get_all_stats() total_dropped = sum(s.dropped_frames for s in decode_stats.values()) # 构建结果 result = TestResult( resolution=resolution, batch_size=batch_size, num_cameras=num_cameras, target_fps=target_fps, gpu_utilization_avg=gpu_metrics.get("gpu_utilization", {}).get("avg", 0), gpu_utilization_max=gpu_metrics.get("gpu_utilization", {}).get("max", 0), gpu_utilization_min=gpu_metrics.get("gpu_utilization", {}).get("min", 0), memory_used_mb=gpu_metrics.get("memory_used_mb", {}).get("avg", 0), memory_utilization=gpu_metrics.get("memory_utilization", {}).get("avg", 0), total_throughput_fps=throughput_metrics.get("throughput_fps", 0), per_camera_fps=throughput_metrics.get("throughput_fps", 0) / num_cameras if num_cameras > 0 else 0, total_frames=throughput_metrics.get("total_frames", 0), total_batches=throughput_metrics.get("total_batches", 0), avg_latency_ms=throughput_metrics.get("latency", {}).get("avg", 0), p95_latency_ms=throughput_metrics.get("latency", {}).get("p95", 0), p99_latency_ms=throughput_metrics.get("latency", {}).get("p99", 0), max_latency_ms=throughput_metrics.get("latency", {}).get("max", 0), min_latency_ms=throughput_metrics.get("latency", {}).get("min", 0), frame_drop_rate=throughput_metrics.get("frame_drop_rate", 0), dropped_frames=total_dropped, test_duration_sec=throughput_metrics.get("duration_sec", 0), ) # 检查实时性 result.check_realtime_capability() # 检查 GPU 饱和 result.is_gpu_saturated, result.saturation_reason = self._check_saturation(result) logger.info(f"结果: 吞吐={result.total_throughput_fps:.1f} FPS, " f"GPU={result.gpu_utilization_avg:.1f}%, " f"延迟={result.avg_latency_ms:.2f}ms (P95={result.p95_latency_ms:.2f}ms), " f"实时={'✓' if result.is_realtime_capable else '✗'}, " f"饱和={'是' if result.is_gpu_saturated else '否'}") return result def _check_saturation(self, result: TestResult) -> Tuple[bool, Optional[str]]: """检查 GPU 是否饱和""" reasons = [] if result.gpu_utilization_avg >= self.config.gpu_saturation_threshold: reasons.append(f"GPU 利用率 {result.gpu_utilization_avg:.1f}% >= {self.config.gpu_saturation_threshold}%") if result.memory_utilization >= self.config.memory_saturation_threshold: reasons.append(f"显存利用率 {result.memory_utilization:.1f}% >= {self.config.memory_saturation_threshold}%") if not result.is_realtime_capable: reasons.append(f"P95 延迟 {result.p95_latency_ms:.2f}ms 超过帧间隔") if result.frame_drop_rate > 5.0: reasons.append(f"丢帧率 {result.frame_drop_rate:.1f}% > 5%") if reasons: return True, "; ".join(reasons) return False, None def run_all_tests(self) -> List[TestResult]: """执行所有测试组合""" combinations = self.config.get_test_combinations() total = len(combinations) logger.info(f"共 {total} 个测试组合") consecutive_failures = 0 max_consecutive_failures = 3 for i, combo in enumerate(combinations, 1): if self._interrupted: logger.warning("测试被中断") break # 连续失败太多次,跳过剩余高负载测试 if consecutive_failures >= max_consecutive_failures: logger.warning(f"连续 {consecutive_failures} 次失败,跳过剩余测试") break logger.info(f"\n[{i}/{total}] 开始测试...") try: result = self.run_single_test( resolution=combo["resolution"], batch_size=combo["batch_size"], num_cameras=combo["camera_count"], target_fps=combo["target_fps"] ) self._results.append(result) consecutive_failures = 0 # 重置失败计数 except Exception as e: error_msg = str(e).lower() if "outofmemory" in error_msg or "cuda" in error_msg or "memory" in error_msg: logger.warning(f"显存不足,跳过此测试: {combo}") consecutive_failures += 1 # 强制清理 self._teardown_test() time.sleep(3) # 额外等待显存释放 else: logger.error(f"测试失败: {e}") consecutive_failures += 1 self._teardown_test() return self._results def save_results(self): """保存测试结果""" if not self._results: logger.warning("没有测试结果可保存") return timestamp = get_timestamp() if self.config.save_json: json_path = f"{self.config.output_dir}/results_{timestamp}.json" export_json(self._results, json_path) logger.info(f"JSON 结果已保存: {json_path}") if self.config.save_csv: csv_path = f"{self.config.output_dir}/results_{timestamp}.csv" export_csv(self._results, csv_path) logger.info(f"CSV 结果已保存: {csv_path}") if self.config.generate_report: report_path = generate_report(self._results, self.config.output_dir) logger.info(f"Markdown 报告已生成: {report_path}") # 生成可视化图表 try: from .visualizer import generate_visualizations viz_files = generate_visualizations(self._results, self.config.output_dir) if viz_files: logger.info(f"可视化图表已生成: {len(viz_files)} 个文件") except Exception as e: logger.warning(f"可视化生成失败: {e}") def run(self): """执行完整 Benchmark 流程""" try: self.initialize() self.run_all_tests() self.save_results() logger.info("\n" + "=" * 60) logger.info("Benchmark 完成!") logger.info("=" * 60) except Exception as e: logger.error(f"Benchmark 执行失败: {e}") raise finally: self._teardown_test()