393 lines
15 KiB
Python
393 lines
15 KiB
Python
"""
|
|
Benchmark 主控程序模块
|
|
"""
|
|
|
|
import time
|
|
import signal
|
|
import sys
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
from dataclasses import dataclass
|
|
|
|
from .config import BenchmarkConfig
|
|
from .engine_builder import TRTEngineBuilder
|
|
from .inference_engine import TRTInferenceEngine
|
|
from .decode_thread import FrameQueueManager
|
|
from .batch_assembler import BatchAssembler
|
|
from .metrics_collector import MetricsCollector
|
|
from .results import TestResult, export_json, export_csv, generate_report
|
|
from .utils import setup_logging, Timer, get_timestamp, ensure_dir
|
|
|
|
logger = setup_logging()
|
|
|
|
|
|
class BenchmarkRunner:
|
|
"""Benchmark 主控程序"""
|
|
|
|
def __init__(self, config: BenchmarkConfig):
|
|
self.config = config
|
|
self._interrupted = False
|
|
self._results: List[TestResult] = []
|
|
|
|
# 组件引用
|
|
self._engine_builder: Optional[TRTEngineBuilder] = None
|
|
self._engines: Dict[int, str] = {}
|
|
self._inference_engine: Optional[TRTInferenceEngine] = None
|
|
self._queue_manager: Optional[FrameQueueManager] = None
|
|
self._batch_assembler: Optional[BatchAssembler] = None
|
|
self._metrics_collector: Optional[MetricsCollector] = None
|
|
|
|
# 注册信号处理
|
|
signal.signal(signal.SIGINT, self._signal_handler)
|
|
signal.signal(signal.SIGTERM, self._signal_handler)
|
|
|
|
def _signal_handler(self, signum, frame):
|
|
logger.warning("收到中断信号,正在停止...")
|
|
self._interrupted = True
|
|
|
|
def initialize(self):
|
|
"""初始化所有组件"""
|
|
logger.info("=" * 60)
|
|
logger.info("FP16 性能评估 Benchmark 框架")
|
|
logger.info("=" * 60)
|
|
|
|
# 构建 TensorRT Engine
|
|
logger.info("构建 TensorRT Engine...")
|
|
self._engine_builder = TRTEngineBuilder(
|
|
model_path=self.config.model_path,
|
|
output_dir=self.config.engine_dir
|
|
)
|
|
self._engines = self._engine_builder.build_all_engines(
|
|
resolutions=self.config.resolutions,
|
|
precision=self.config.precision
|
|
)
|
|
logger.info(f"Engine 构建完成: {list(self._engines.keys())}")
|
|
|
|
# 初始化指标采集器
|
|
self._metrics_collector = MetricsCollector(
|
|
device_id=self.config.device_id,
|
|
sample_interval_ms=self.config.metrics_sample_interval_ms
|
|
)
|
|
|
|
# 创建输出目录
|
|
ensure_dir(self.config.output_dir)
|
|
|
|
logger.info("初始化完成")
|
|
|
|
def _setup_test(self, resolution: int, batch_size: int, num_cameras: int, target_fps: float):
|
|
"""设置单次测试环境"""
|
|
# 加载对应分辨率的 Engine
|
|
engine_path = self._engines[resolution]
|
|
self._inference_engine = TRTInferenceEngine(
|
|
engine_path=engine_path,
|
|
num_streams=self.config.num_cuda_streams,
|
|
device_id=self.config.device_id
|
|
)
|
|
|
|
# 创建帧队列管理器
|
|
self._queue_manager = FrameQueueManager(queue_size=self.config.frame_queue_size)
|
|
|
|
# 添加视频源
|
|
for i in range(num_cameras):
|
|
source_id = f"cam_{i:02d}"
|
|
|
|
if self.config.use_synthetic:
|
|
self._queue_manager.add_source(
|
|
source_id=source_id,
|
|
source="synthetic",
|
|
target_fps=target_fps,
|
|
source_type="synthetic",
|
|
resolution=self.config.synthetic_resolution
|
|
)
|
|
elif i < len(self.config.video_sources):
|
|
src = self.config.video_sources[i]
|
|
self._queue_manager.add_source(
|
|
source_id=source_id,
|
|
source=src.get("url", ""),
|
|
target_fps=target_fps,
|
|
source_type=src.get("type", "rtsp"),
|
|
resolution=self.config.synthetic_resolution
|
|
)
|
|
else:
|
|
# 超出配置的源数量,使用合成源
|
|
self._queue_manager.add_source(
|
|
source_id=source_id,
|
|
source="synthetic",
|
|
target_fps=target_fps,
|
|
source_type="synthetic",
|
|
resolution=self.config.synthetic_resolution
|
|
)
|
|
|
|
# 创建 Batch 组装器
|
|
self._batch_assembler = BatchAssembler(
|
|
frame_queues=self._queue_manager.queues,
|
|
batch_size=batch_size,
|
|
imgsz=(resolution, resolution),
|
|
use_gpu_preprocess=True,
|
|
device_id=self.config.device_id
|
|
)
|
|
|
|
def _teardown_test(self):
|
|
"""清理测试环境"""
|
|
if self._queue_manager:
|
|
self._queue_manager.stop_all()
|
|
self._queue_manager = None
|
|
|
|
if self._inference_engine:
|
|
self._inference_engine.cleanup()
|
|
self._inference_engine = None
|
|
|
|
self._batch_assembler = None
|
|
|
|
# 强制 GC 和显存清理
|
|
import gc
|
|
import torch
|
|
gc.collect()
|
|
if torch.cuda.is_available():
|
|
torch.cuda.empty_cache()
|
|
torch.cuda.synchronize()
|
|
|
|
# 等待显存释放
|
|
time.sleep(1)
|
|
|
|
def run_single_test(
|
|
self,
|
|
resolution: int,
|
|
batch_size: int,
|
|
num_cameras: int,
|
|
target_fps: float
|
|
) -> TestResult:
|
|
"""执行单次测试"""
|
|
logger.info(f"测试配置: {resolution}x{resolution}, batch={batch_size}, "
|
|
f"cameras={num_cameras}, fps={target_fps}")
|
|
|
|
# 设置测试环境
|
|
self._setup_test(resolution, batch_size, num_cameras, target_fps)
|
|
|
|
# 启动解码线程
|
|
self._queue_manager.start_all()
|
|
|
|
# 重置指标采集器
|
|
self._metrics_collector.reset()
|
|
|
|
# 预热阶段
|
|
logger.info(f"预热 {self.config.warmup_sec} 秒...")
|
|
warmup_end = time.time() + self.config.warmup_sec
|
|
while time.time() < warmup_end and not self._interrupted:
|
|
batch_data = self._batch_assembler.assemble_batch(timeout=0.1)
|
|
if batch_data:
|
|
batch, _ = batch_data
|
|
self._inference_engine.infer_sync(batch)
|
|
|
|
# 开始正式测试
|
|
logger.info(f"正式测试 {self.config.test_duration_sec} 秒...")
|
|
self._metrics_collector.start()
|
|
|
|
test_start = time.time()
|
|
test_end = test_start + self.config.test_duration_sec
|
|
stream_id = 0
|
|
|
|
while time.time() < test_end and not self._interrupted:
|
|
batch_data = self._batch_assembler.assemble_batch(timeout=0.1)
|
|
if batch_data:
|
|
batch, frame_infos = batch_data
|
|
|
|
# 轮询使用不同的 CUDA Stream
|
|
task_id = self._inference_engine.infer_async(batch, stream_id)
|
|
stream_id = (stream_id + 1) % self.config.num_cuda_streams
|
|
|
|
# 获取结果
|
|
_, latency_ms = self._inference_engine.get_results(task_id)
|
|
|
|
# 记录指标
|
|
self._metrics_collector.record_inference(latency_ms, len(frame_infos))
|
|
|
|
# 停止采集
|
|
self._metrics_collector.stop()
|
|
|
|
# 冷却阶段
|
|
time.sleep(self.config.cooldown_sec)
|
|
|
|
# 收集结果
|
|
result = self._collect_results(resolution, batch_size, num_cameras, target_fps)
|
|
|
|
# 清理
|
|
self._teardown_test()
|
|
|
|
return result
|
|
|
|
def _collect_results(
|
|
self,
|
|
resolution: int,
|
|
batch_size: int,
|
|
num_cameras: int,
|
|
target_fps: float
|
|
) -> TestResult:
|
|
"""收集测试结果"""
|
|
gpu_metrics = self._metrics_collector.get_gpu_metrics()
|
|
throughput_metrics = self._metrics_collector.get_throughput_metrics()
|
|
|
|
# 获取解码统计
|
|
decode_stats = self._queue_manager.get_all_stats()
|
|
total_dropped = sum(s.dropped_frames for s in decode_stats.values())
|
|
|
|
# 构建结果
|
|
result = TestResult(
|
|
resolution=resolution,
|
|
batch_size=batch_size,
|
|
num_cameras=num_cameras,
|
|
target_fps=target_fps,
|
|
|
|
gpu_utilization_avg=gpu_metrics.get("gpu_utilization", {}).get("avg", 0),
|
|
gpu_utilization_max=gpu_metrics.get("gpu_utilization", {}).get("max", 0),
|
|
gpu_utilization_min=gpu_metrics.get("gpu_utilization", {}).get("min", 0),
|
|
memory_used_mb=gpu_metrics.get("memory_used_mb", {}).get("avg", 0),
|
|
memory_utilization=gpu_metrics.get("memory_utilization", {}).get("avg", 0),
|
|
|
|
total_throughput_fps=throughput_metrics.get("throughput_fps", 0),
|
|
per_camera_fps=throughput_metrics.get("throughput_fps", 0) / num_cameras if num_cameras > 0 else 0,
|
|
total_frames=throughput_metrics.get("total_frames", 0),
|
|
total_batches=throughput_metrics.get("total_batches", 0),
|
|
|
|
avg_latency_ms=throughput_metrics.get("latency", {}).get("avg", 0),
|
|
p95_latency_ms=throughput_metrics.get("latency", {}).get("p95", 0),
|
|
p99_latency_ms=throughput_metrics.get("latency", {}).get("p99", 0),
|
|
max_latency_ms=throughput_metrics.get("latency", {}).get("max", 0),
|
|
min_latency_ms=throughput_metrics.get("latency", {}).get("min", 0),
|
|
|
|
frame_drop_rate=throughput_metrics.get("frame_drop_rate", 0),
|
|
dropped_frames=total_dropped,
|
|
|
|
test_duration_sec=throughput_metrics.get("duration_sec", 0),
|
|
)
|
|
|
|
# 检查实时性
|
|
result.check_realtime_capability()
|
|
|
|
# 检查 GPU 饱和
|
|
result.is_gpu_saturated, result.saturation_reason = self._check_saturation(result)
|
|
|
|
logger.info(f"结果: 吞吐={result.total_throughput_fps:.1f} FPS, "
|
|
f"GPU={result.gpu_utilization_avg:.1f}%, "
|
|
f"延迟={result.avg_latency_ms:.2f}ms (P95={result.p95_latency_ms:.2f}ms), "
|
|
f"实时={'✓' if result.is_realtime_capable else '✗'}, "
|
|
f"饱和={'是' if result.is_gpu_saturated else '否'}")
|
|
|
|
return result
|
|
|
|
def _check_saturation(self, result: TestResult) -> Tuple[bool, Optional[str]]:
|
|
"""检查 GPU 是否饱和"""
|
|
reasons = []
|
|
|
|
if result.gpu_utilization_avg >= self.config.gpu_saturation_threshold:
|
|
reasons.append(f"GPU 利用率 {result.gpu_utilization_avg:.1f}% >= {self.config.gpu_saturation_threshold}%")
|
|
|
|
if result.memory_utilization >= self.config.memory_saturation_threshold:
|
|
reasons.append(f"显存利用率 {result.memory_utilization:.1f}% >= {self.config.memory_saturation_threshold}%")
|
|
|
|
if not result.is_realtime_capable:
|
|
reasons.append(f"P95 延迟 {result.p95_latency_ms:.2f}ms 超过帧间隔")
|
|
|
|
if result.frame_drop_rate > 5.0:
|
|
reasons.append(f"丢帧率 {result.frame_drop_rate:.1f}% > 5%")
|
|
|
|
if reasons:
|
|
return True, "; ".join(reasons)
|
|
return False, None
|
|
|
|
def run_all_tests(self) -> List[TestResult]:
|
|
"""执行所有测试组合"""
|
|
combinations = self.config.get_test_combinations()
|
|
total = len(combinations)
|
|
|
|
logger.info(f"共 {total} 个测试组合")
|
|
|
|
consecutive_failures = 0
|
|
max_consecutive_failures = 3
|
|
|
|
for i, combo in enumerate(combinations, 1):
|
|
if self._interrupted:
|
|
logger.warning("测试被中断")
|
|
break
|
|
|
|
# 连续失败太多次,跳过剩余高负载测试
|
|
if consecutive_failures >= max_consecutive_failures:
|
|
logger.warning(f"连续 {consecutive_failures} 次失败,跳过剩余测试")
|
|
break
|
|
|
|
logger.info(f"\n[{i}/{total}] 开始测试...")
|
|
|
|
try:
|
|
result = self.run_single_test(
|
|
resolution=combo["resolution"],
|
|
batch_size=combo["batch_size"],
|
|
num_cameras=combo["camera_count"],
|
|
target_fps=combo["target_fps"]
|
|
)
|
|
|
|
self._results.append(result)
|
|
consecutive_failures = 0 # 重置失败计数
|
|
|
|
except Exception as e:
|
|
error_msg = str(e).lower()
|
|
if "outofmemory" in error_msg or "cuda" in error_msg or "memory" in error_msg:
|
|
logger.warning(f"显存不足,跳过此测试: {combo}")
|
|
consecutive_failures += 1
|
|
|
|
# 强制清理
|
|
self._teardown_test()
|
|
time.sleep(3) # 额外等待显存释放
|
|
else:
|
|
logger.error(f"测试失败: {e}")
|
|
consecutive_failures += 1
|
|
self._teardown_test()
|
|
|
|
return self._results
|
|
|
|
def save_results(self):
|
|
"""保存测试结果"""
|
|
if not self._results:
|
|
logger.warning("没有测试结果可保存")
|
|
return
|
|
|
|
timestamp = get_timestamp()
|
|
|
|
if self.config.save_json:
|
|
json_path = f"{self.config.output_dir}/results_{timestamp}.json"
|
|
export_json(self._results, json_path)
|
|
logger.info(f"JSON 结果已保存: {json_path}")
|
|
|
|
if self.config.save_csv:
|
|
csv_path = f"{self.config.output_dir}/results_{timestamp}.csv"
|
|
export_csv(self._results, csv_path)
|
|
logger.info(f"CSV 结果已保存: {csv_path}")
|
|
|
|
if self.config.generate_report:
|
|
report_path = generate_report(self._results, self.config.output_dir)
|
|
logger.info(f"Markdown 报告已生成: {report_path}")
|
|
|
|
# 生成可视化图表
|
|
try:
|
|
from .visualizer import generate_visualizations
|
|
viz_files = generate_visualizations(self._results, self.config.output_dir)
|
|
if viz_files:
|
|
logger.info(f"可视化图表已生成: {len(viz_files)} 个文件")
|
|
except Exception as e:
|
|
logger.warning(f"可视化生成失败: {e}")
|
|
|
|
def run(self):
|
|
"""执行完整 Benchmark 流程"""
|
|
try:
|
|
self.initialize()
|
|
self.run_all_tests()
|
|
self.save_results()
|
|
|
|
logger.info("\n" + "=" * 60)
|
|
logger.info("Benchmark 完成!")
|
|
logger.info("=" * 60)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Benchmark 执行失败: {e}")
|
|
raise
|
|
finally:
|
|
self._teardown_test()
|