Files
Test_AI/benchmark/benchmark_runner.py
2026-01-20 10:54:30 +08:00

393 lines
15 KiB
Python

"""
Benchmark 主控程序模块
"""
import time
import signal
import sys
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from .config import BenchmarkConfig
from .engine_builder import TRTEngineBuilder
from .inference_engine import TRTInferenceEngine
from .decode_thread import FrameQueueManager
from .batch_assembler import BatchAssembler
from .metrics_collector import MetricsCollector
from .results import TestResult, export_json, export_csv, generate_report
from .utils import setup_logging, Timer, get_timestamp, ensure_dir
logger = setup_logging()
class BenchmarkRunner:
"""Benchmark 主控程序"""
def __init__(self, config: BenchmarkConfig):
self.config = config
self._interrupted = False
self._results: List[TestResult] = []
# 组件引用
self._engine_builder: Optional[TRTEngineBuilder] = None
self._engines: Dict[int, str] = {}
self._inference_engine: Optional[TRTInferenceEngine] = None
self._queue_manager: Optional[FrameQueueManager] = None
self._batch_assembler: Optional[BatchAssembler] = None
self._metrics_collector: Optional[MetricsCollector] = None
# 注册信号处理
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, signum, frame):
logger.warning("收到中断信号,正在停止...")
self._interrupted = True
def initialize(self):
"""初始化所有组件"""
logger.info("=" * 60)
logger.info("FP16 性能评估 Benchmark 框架")
logger.info("=" * 60)
# 构建 TensorRT Engine
logger.info("构建 TensorRT Engine...")
self._engine_builder = TRTEngineBuilder(
model_path=self.config.model_path,
output_dir=self.config.engine_dir
)
self._engines = self._engine_builder.build_all_engines(
resolutions=self.config.resolutions,
precision=self.config.precision
)
logger.info(f"Engine 构建完成: {list(self._engines.keys())}")
# 初始化指标采集器
self._metrics_collector = MetricsCollector(
device_id=self.config.device_id,
sample_interval_ms=self.config.metrics_sample_interval_ms
)
# 创建输出目录
ensure_dir(self.config.output_dir)
logger.info("初始化完成")
def _setup_test(self, resolution: int, batch_size: int, num_cameras: int, target_fps: float):
"""设置单次测试环境"""
# 加载对应分辨率的 Engine
engine_path = self._engines[resolution]
self._inference_engine = TRTInferenceEngine(
engine_path=engine_path,
num_streams=self.config.num_cuda_streams,
device_id=self.config.device_id
)
# 创建帧队列管理器
self._queue_manager = FrameQueueManager(queue_size=self.config.frame_queue_size)
# 添加视频源
for i in range(num_cameras):
source_id = f"cam_{i:02d}"
if self.config.use_synthetic:
self._queue_manager.add_source(
source_id=source_id,
source="synthetic",
target_fps=target_fps,
source_type="synthetic",
resolution=self.config.synthetic_resolution
)
elif i < len(self.config.video_sources):
src = self.config.video_sources[i]
self._queue_manager.add_source(
source_id=source_id,
source=src.get("url", ""),
target_fps=target_fps,
source_type=src.get("type", "rtsp"),
resolution=self.config.synthetic_resolution
)
else:
# 超出配置的源数量,使用合成源
self._queue_manager.add_source(
source_id=source_id,
source="synthetic",
target_fps=target_fps,
source_type="synthetic",
resolution=self.config.synthetic_resolution
)
# 创建 Batch 组装器
self._batch_assembler = BatchAssembler(
frame_queues=self._queue_manager.queues,
batch_size=batch_size,
imgsz=(resolution, resolution),
use_gpu_preprocess=True,
device_id=self.config.device_id
)
def _teardown_test(self):
"""清理测试环境"""
if self._queue_manager:
self._queue_manager.stop_all()
self._queue_manager = None
if self._inference_engine:
self._inference_engine.cleanup()
self._inference_engine = None
self._batch_assembler = None
# 强制 GC 和显存清理
import gc
import torch
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
# 等待显存释放
time.sleep(1)
def run_single_test(
self,
resolution: int,
batch_size: int,
num_cameras: int,
target_fps: float
) -> TestResult:
"""执行单次测试"""
logger.info(f"测试配置: {resolution}x{resolution}, batch={batch_size}, "
f"cameras={num_cameras}, fps={target_fps}")
# 设置测试环境
self._setup_test(resolution, batch_size, num_cameras, target_fps)
# 启动解码线程
self._queue_manager.start_all()
# 重置指标采集器
self._metrics_collector.reset()
# 预热阶段
logger.info(f"预热 {self.config.warmup_sec} 秒...")
warmup_end = time.time() + self.config.warmup_sec
while time.time() < warmup_end and not self._interrupted:
batch_data = self._batch_assembler.assemble_batch(timeout=0.1)
if batch_data:
batch, _ = batch_data
self._inference_engine.infer_sync(batch)
# 开始正式测试
logger.info(f"正式测试 {self.config.test_duration_sec} 秒...")
self._metrics_collector.start()
test_start = time.time()
test_end = test_start + self.config.test_duration_sec
stream_id = 0
while time.time() < test_end and not self._interrupted:
batch_data = self._batch_assembler.assemble_batch(timeout=0.1)
if batch_data:
batch, frame_infos = batch_data
# 轮询使用不同的 CUDA Stream
task_id = self._inference_engine.infer_async(batch, stream_id)
stream_id = (stream_id + 1) % self.config.num_cuda_streams
# 获取结果
_, latency_ms = self._inference_engine.get_results(task_id)
# 记录指标
self._metrics_collector.record_inference(latency_ms, len(frame_infos))
# 停止采集
self._metrics_collector.stop()
# 冷却阶段
time.sleep(self.config.cooldown_sec)
# 收集结果
result = self._collect_results(resolution, batch_size, num_cameras, target_fps)
# 清理
self._teardown_test()
return result
def _collect_results(
self,
resolution: int,
batch_size: int,
num_cameras: int,
target_fps: float
) -> TestResult:
"""收集测试结果"""
gpu_metrics = self._metrics_collector.get_gpu_metrics()
throughput_metrics = self._metrics_collector.get_throughput_metrics()
# 获取解码统计
decode_stats = self._queue_manager.get_all_stats()
total_dropped = sum(s.dropped_frames for s in decode_stats.values())
# 构建结果
result = TestResult(
resolution=resolution,
batch_size=batch_size,
num_cameras=num_cameras,
target_fps=target_fps,
gpu_utilization_avg=gpu_metrics.get("gpu_utilization", {}).get("avg", 0),
gpu_utilization_max=gpu_metrics.get("gpu_utilization", {}).get("max", 0),
gpu_utilization_min=gpu_metrics.get("gpu_utilization", {}).get("min", 0),
memory_used_mb=gpu_metrics.get("memory_used_mb", {}).get("avg", 0),
memory_utilization=gpu_metrics.get("memory_utilization", {}).get("avg", 0),
total_throughput_fps=throughput_metrics.get("throughput_fps", 0),
per_camera_fps=throughput_metrics.get("throughput_fps", 0) / num_cameras if num_cameras > 0 else 0,
total_frames=throughput_metrics.get("total_frames", 0),
total_batches=throughput_metrics.get("total_batches", 0),
avg_latency_ms=throughput_metrics.get("latency", {}).get("avg", 0),
p95_latency_ms=throughput_metrics.get("latency", {}).get("p95", 0),
p99_latency_ms=throughput_metrics.get("latency", {}).get("p99", 0),
max_latency_ms=throughput_metrics.get("latency", {}).get("max", 0),
min_latency_ms=throughput_metrics.get("latency", {}).get("min", 0),
frame_drop_rate=throughput_metrics.get("frame_drop_rate", 0),
dropped_frames=total_dropped,
test_duration_sec=throughput_metrics.get("duration_sec", 0),
)
# 检查实时性
result.check_realtime_capability()
# 检查 GPU 饱和
result.is_gpu_saturated, result.saturation_reason = self._check_saturation(result)
logger.info(f"结果: 吞吐={result.total_throughput_fps:.1f} FPS, "
f"GPU={result.gpu_utilization_avg:.1f}%, "
f"延迟={result.avg_latency_ms:.2f}ms (P95={result.p95_latency_ms:.2f}ms), "
f"实时={'' if result.is_realtime_capable else ''}, "
f"饱和={'' if result.is_gpu_saturated else ''}")
return result
def _check_saturation(self, result: TestResult) -> Tuple[bool, Optional[str]]:
"""检查 GPU 是否饱和"""
reasons = []
if result.gpu_utilization_avg >= self.config.gpu_saturation_threshold:
reasons.append(f"GPU 利用率 {result.gpu_utilization_avg:.1f}% >= {self.config.gpu_saturation_threshold}%")
if result.memory_utilization >= self.config.memory_saturation_threshold:
reasons.append(f"显存利用率 {result.memory_utilization:.1f}% >= {self.config.memory_saturation_threshold}%")
if not result.is_realtime_capable:
reasons.append(f"P95 延迟 {result.p95_latency_ms:.2f}ms 超过帧间隔")
if result.frame_drop_rate > 5.0:
reasons.append(f"丢帧率 {result.frame_drop_rate:.1f}% > 5%")
if reasons:
return True, "; ".join(reasons)
return False, None
def run_all_tests(self) -> List[TestResult]:
"""执行所有测试组合"""
combinations = self.config.get_test_combinations()
total = len(combinations)
logger.info(f"{total} 个测试组合")
consecutive_failures = 0
max_consecutive_failures = 3
for i, combo in enumerate(combinations, 1):
if self._interrupted:
logger.warning("测试被中断")
break
# 连续失败太多次,跳过剩余高负载测试
if consecutive_failures >= max_consecutive_failures:
logger.warning(f"连续 {consecutive_failures} 次失败,跳过剩余测试")
break
logger.info(f"\n[{i}/{total}] 开始测试...")
try:
result = self.run_single_test(
resolution=combo["resolution"],
batch_size=combo["batch_size"],
num_cameras=combo["camera_count"],
target_fps=combo["target_fps"]
)
self._results.append(result)
consecutive_failures = 0 # 重置失败计数
except Exception as e:
error_msg = str(e).lower()
if "outofmemory" in error_msg or "cuda" in error_msg or "memory" in error_msg:
logger.warning(f"显存不足,跳过此测试: {combo}")
consecutive_failures += 1
# 强制清理
self._teardown_test()
time.sleep(3) # 额外等待显存释放
else:
logger.error(f"测试失败: {e}")
consecutive_failures += 1
self._teardown_test()
return self._results
def save_results(self):
"""保存测试结果"""
if not self._results:
logger.warning("没有测试结果可保存")
return
timestamp = get_timestamp()
if self.config.save_json:
json_path = f"{self.config.output_dir}/results_{timestamp}.json"
export_json(self._results, json_path)
logger.info(f"JSON 结果已保存: {json_path}")
if self.config.save_csv:
csv_path = f"{self.config.output_dir}/results_{timestamp}.csv"
export_csv(self._results, csv_path)
logger.info(f"CSV 结果已保存: {csv_path}")
if self.config.generate_report:
report_path = generate_report(self._results, self.config.output_dir)
logger.info(f"Markdown 报告已生成: {report_path}")
# 生成可视化图表
try:
from .visualizer import generate_visualizations
viz_files = generate_visualizations(self._results, self.config.output_dir)
if viz_files:
logger.info(f"可视化图表已生成: {len(viz_files)} 个文件")
except Exception as e:
logger.warning(f"可视化生成失败: {e}")
def run(self):
"""执行完整 Benchmark 流程"""
try:
self.initialize()
self.run_all_tests()
self.save_results()
logger.info("\n" + "=" * 60)
logger.info("Benchmark 完成!")
logger.info("=" * 60)
except Exception as e:
logger.error(f"Benchmark 执行失败: {e}")
raise
finally:
self._teardown_test()