Files
Test_AI/performance_test.py
2026-01-20 11:14:10 +08:00

852 lines
33 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
YOLOv11 性能对比测试系统
PyTorch vs TensorRT 完整性能测试
"""
import os
import sys
import time
import json
import threading
import numpy as np
import cv2
import torch
import psutil
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass, asdict
from ultralytics import YOLO
# 性能指标数据类
@dataclass
class PerformanceMetrics:
timestamp: float
engine_type: str
fps: Optional[float] = None
latency_ms: Optional[float] = None
gpu_utilization: Optional[float] = None
gpu_memory_mb: Optional[float] = None
cpu_utilization: Optional[float] = None
memory_mb: Optional[float] = None
concurrent_streams: Optional[int] = None
batch_size: Optional[int] = None
@dataclass
class TestResult:
engine_type: str
test_type: str
avg_fps: float
max_fps: float
min_fps: float
avg_latency_ms: float
max_latency_ms: float
min_latency_ms: float
avg_gpu_util: float
max_gpu_util: float
avg_gpu_memory_mb: float
max_gpu_memory_mb: float
avg_cpu_util: float
max_cpu_util: float
test_duration: float
total_frames: int
concurrent_streams: int = 1
batch_size: int = 1
class ResourceMonitor:
"""系统资源监控器"""
def __init__(self, sampling_interval: float = 0.1):
self.sampling_interval = sampling_interval
self.is_monitoring = False
self.metrics_history = []
self.monitor_thread = None
def start_monitoring(self):
"""开始监控"""
self.is_monitoring = True
self.metrics_history = []
self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
self.monitor_thread.start()
def stop_monitoring(self):
"""停止监控"""
self.is_monitoring = False
if self.monitor_thread:
self.monitor_thread.join(timeout=1.0)
def _monitor_loop(self):
"""监控循环"""
try:
import GPUtil
except ImportError:
print("警告: GPUtil 未安装GPU 监控不可用")
GPUtil = None
while self.is_monitoring:
try:
# CPU 和内存监控
cpu_util = psutil.cpu_percent(interval=None)
memory_info = psutil.virtual_memory()
memory_mb = memory_info.used / 1024 / 1024
# GPU 监控
gpu_util = None
gpu_memory_mb = None
if GPUtil and torch.cuda.is_available():
try:
gpus = GPUtil.getGPUs()
if gpus:
gpu = gpus[0]
gpu_util = gpu.load * 100
gpu_memory_mb = gpu.memoryUsed
except:
pass
# 使用 torch 获取 GPU 信息作为备选
if gpu_util is None and torch.cuda.is_available():
try:
gpu_memory_mb = torch.cuda.memory_allocated(0) / 1024 / 1024
# GPU 利用率通过 torch 较难获取,使用占位符
gpu_util = 0.0
except:
pass
metrics = {
'timestamp': time.time(),
'cpu_utilization': cpu_util,
'memory_mb': memory_mb,
'gpu_utilization': gpu_util,
'gpu_memory_mb': gpu_memory_mb
}
self.metrics_history.append(metrics)
except Exception as e:
print(f"监控错误: {e}")
time.sleep(self.sampling_interval)
def get_average_metrics(self) -> Dict:
"""获取平均指标"""
if not self.metrics_history:
return {}
metrics = {}
for key in ['cpu_utilization', 'memory_mb', 'gpu_utilization', 'gpu_memory_mb']:
values = [m[key] for m in self.metrics_history if m[key] is not None]
if values:
metrics[f'avg_{key}'] = np.mean(values)
metrics[f'max_{key}'] = np.max(values)
metrics[f'min_{key}'] = np.min(values)
return metrics
class MockCamera:
"""模拟摄像头"""
def __init__(self, width: int = 640, height: int = 640, fps: int = 30):
self.width = width
self.height = height
self.fps = fps
self.frame_count = 0
def generate_frame(self) -> np.ndarray:
"""生成模拟帧"""
# 生成随机图像
frame = np.random.randint(0, 255, (self.height, self.width, 3), dtype=np.uint8)
# 添加一些简单的几何形状模拟目标
if self.frame_count % 10 < 5: # 50% 概率有目标
# 添加矩形模拟人员
x1, y1 = np.random.randint(50, self.width-100), np.random.randint(50, self.height-150)
x2, y2 = x1 + 50, y1 + 100
cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 255, 255), -1)
self.frame_count += 1
return frame
def generate_batch(self, batch_size: int) -> List[np.ndarray]:
"""生成批量帧"""
return [self.generate_frame() for _ in range(batch_size)]
class InferenceEngine:
"""推理引擎基类"""
def __init__(self, model_path: str, engine_type: str):
self.model_path = model_path
self.engine_type = engine_type
self.model = None
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
def load_model(self):
"""加载模型"""
raise NotImplementedError
def infer_single(self, image: np.ndarray) -> Dict:
"""单帧推理"""
raise NotImplementedError
def infer_batch(self, images: List[np.ndarray]) -> List[Dict]:
"""批量推理"""
raise NotImplementedError
def cleanup(self):
"""清理资源"""
if hasattr(self, 'model') and self.model:
del self.model
if torch.cuda.is_available():
torch.cuda.empty_cache()
class PyTorchEngine(InferenceEngine):
"""PyTorch 推理引擎"""
def __init__(self, model_path: str):
super().__init__(model_path, "pytorch")
def load_model(self):
"""加载 PyTorch 模型"""
print(f"加载 PyTorch 模型: {self.model_path}")
self.model = YOLO(self.model_path)
self.model.to(self.device)
print(f"✅ PyTorch 模型加载完成,设备: {self.device}")
def infer_single(self, image: np.ndarray) -> Dict:
"""单帧推理"""
start_time = time.time()
results = self.model(image, verbose=False, device=self.device)
end_time = time.time()
latency_ms = (end_time - start_time) * 1000
return {
'latency_ms': latency_ms,
'detections': len(results[0].boxes) if results[0].boxes is not None else 0
}
def infer_batch(self, images: List[np.ndarray]) -> List[Dict]:
"""批量推理"""
start_time = time.time()
results = self.model(images, verbose=False, device=self.device)
end_time = time.time()
total_latency_ms = (end_time - start_time) * 1000
avg_latency_ms = total_latency_ms / len(images)
return [{
'latency_ms': avg_latency_ms,
'detections': len(result.boxes) if result.boxes is not None else 0
} for result in results]
class TensorRTEngine(InferenceEngine):
"""TensorRT 推理引擎"""
def __init__(self, model_path: str):
super().__init__(model_path, "tensorrt")
self.engine_path = None
def load_model(self):
"""加载或创建 TensorRT 模型"""
# 检查是否已有 TensorRT 引擎文件
engine_path = self.model_path.replace('.pt', '.engine')
if os.path.exists(engine_path):
print(f"找到现有 TensorRT 引擎: {engine_path}")
self.engine_path = engine_path
else:
print(f"创建 TensorRT 引擎: {self.model_path} -> {engine_path}")
self._export_tensorrt_engine(engine_path)
# 加载 TensorRT 引擎
self.model = YOLO(self.engine_path)
print(f"✅ TensorRT 模型加载完成")
def _export_tensorrt_engine(self, engine_path: str):
"""导出 TensorRT 引擎"""
print("正在导出 TensorRT 引擎,这可能需要几分钟...")
# 加载原始模型
model = YOLO(self.model_path)
# 导出为 TensorRT
try:
exported_model = model.export(
format='engine',
imgsz=640,
device=0 if torch.cuda.is_available() else 'cpu',
half=True, # FP16
dynamic=False,
simplify=True,
workspace=4, # GB
verbose=True
)
self.engine_path = exported_model
print(f"✅ TensorRT 引擎导出完成: {self.engine_path}")
except Exception as e:
print(f"❌ TensorRT 引擎导出失败: {e}")
raise
def infer_single(self, image: np.ndarray) -> Dict:
"""单帧推理"""
start_time = time.time()
results = self.model(image, verbose=False)
end_time = time.time()
latency_ms = (end_time - start_time) * 1000
return {
'latency_ms': latency_ms,
'detections': len(results[0].boxes) if results[0].boxes is not None else 0
}
def infer_batch(self, images: List[np.ndarray]) -> List[Dict]:
"""批量推理"""
start_time = time.time()
results = self.model(images, verbose=False)
end_time = time.time()
total_latency_ms = (end_time - start_time) * 1000
avg_latency_ms = total_latency_ms / len(images)
return [{
'latency_ms': avg_latency_ms,
'detections': len(result.boxes) if result.boxes is not None else 0
} for result in results]
class PerformanceTester:
"""性能测试器"""
def __init__(self, model_path: str):
self.model_path = model_path
self.results = []
self.resource_monitor = ResourceMonitor()
def test_single_inference(self, engine: InferenceEngine, test_duration: int = 30) -> TestResult:
"""测试单帧推理性能"""
print(f"\n🔄 测试 {engine.engine_type} 单帧推理性能 ({test_duration}秒)...")
camera = MockCamera()
fps_list = []
latency_list = []
frame_count = 0
# 开始资源监控
self.resource_monitor.start_monitoring()
start_time = time.time()
last_fps_time = start_time
fps_frame_count = 0
while time.time() - start_time < test_duration:
# 生成测试帧
frame = camera.generate_frame()
# 推理
result = engine.infer_single(frame)
latency_list.append(result['latency_ms'])
frame_count += 1
fps_frame_count += 1
# 每秒计算一次 FPS
current_time = time.time()
if current_time - last_fps_time >= 1.0:
fps = fps_frame_count / (current_time - last_fps_time)
fps_list.append(fps)
fps_frame_count = 0
last_fps_time = current_time
# 显示进度
elapsed = current_time - start_time
print(f" 进度: {elapsed:.1f}s/{test_duration}s, 当前FPS: {fps:.1f}, 延迟: {result['latency_ms']:.1f}ms")
# 停止监控
self.resource_monitor.stop_monitoring()
resource_metrics = self.resource_monitor.get_average_metrics()
# 计算结果
total_time = time.time() - start_time
result = TestResult(
engine_type=engine.engine_type,
test_type="single_inference",
avg_fps=np.mean(fps_list) if fps_list else 0,
max_fps=np.max(fps_list) if fps_list else 0,
min_fps=np.min(fps_list) if fps_list else 0,
avg_latency_ms=np.mean(latency_list),
max_latency_ms=np.max(latency_list),
min_latency_ms=np.min(latency_list),
avg_gpu_util=resource_metrics.get('avg_gpu_utilization', 0),
max_gpu_util=resource_metrics.get('max_gpu_utilization', 0),
avg_gpu_memory_mb=resource_metrics.get('avg_gpu_memory_mb', 0),
max_gpu_memory_mb=resource_metrics.get('max_gpu_memory_mb', 0),
avg_cpu_util=resource_metrics.get('avg_cpu_utilization', 0),
max_cpu_util=resource_metrics.get('max_cpu_utilization', 0),
test_duration=total_time,
total_frames=frame_count
)
print(f"{engine.engine_type} 单帧推理测试完成:")
print(f" 平均FPS: {result.avg_fps:.1f}")
print(f" 平均延迟: {result.avg_latency_ms:.1f}ms")
print(f" GPU利用率: {result.avg_gpu_util:.1f}%")
print(f" GPU内存: {result.avg_gpu_memory_mb:.1f}MB")
return result
def test_batch_inference(self, engine: InferenceEngine, batch_sizes: List[int], test_duration: int = 20) -> List[TestResult]:
"""测试批量推理性能"""
results = []
for batch_size in batch_sizes:
print(f"\n🔄 测试 {engine.engine_type} 批量推理性能 (批次大小: {batch_size}, {test_duration}秒)...")
camera = MockCamera()
fps_list = []
latency_list = []
batch_count = 0
# 开始资源监控
self.resource_monitor.start_monitoring()
start_time = time.time()
last_fps_time = start_time
fps_batch_count = 0
while time.time() - start_time < test_duration:
# 生成批量测试帧
batch_frames = camera.generate_batch(batch_size)
# 批量推理
batch_results = engine.infer_batch(batch_frames)
avg_latency = np.mean([r['latency_ms'] for r in batch_results])
latency_list.append(avg_latency)
batch_count += 1
fps_batch_count += 1
# 每秒计算一次 FPS
current_time = time.time()
if current_time - last_fps_time >= 1.0:
# 批量FPS = 批次数 * 批次大小 / 时间
fps = (fps_batch_count * batch_size) / (current_time - last_fps_time)
fps_list.append(fps)
fps_batch_count = 0
last_fps_time = current_time
# 显示进度
elapsed = current_time - start_time
print(f" 进度: {elapsed:.1f}s/{test_duration}s, 当前FPS: {fps:.1f}, 延迟: {avg_latency:.1f}ms")
# 停止监控
self.resource_monitor.stop_monitoring()
resource_metrics = self.resource_monitor.get_average_metrics()
# 计算结果
total_time = time.time() - start_time
total_frames = batch_count * batch_size
result = TestResult(
engine_type=engine.engine_type,
test_type="batch_inference",
avg_fps=np.mean(fps_list) if fps_list else 0,
max_fps=np.max(fps_list) if fps_list else 0,
min_fps=np.min(fps_list) if fps_list else 0,
avg_latency_ms=np.mean(latency_list),
max_latency_ms=np.max(latency_list),
min_latency_ms=np.min(latency_list),
avg_gpu_util=resource_metrics.get('avg_gpu_utilization', 0),
max_gpu_util=resource_metrics.get('max_gpu_utilization', 0),
avg_gpu_memory_mb=resource_metrics.get('avg_gpu_memory_mb', 0),
max_gpu_memory_mb=resource_metrics.get('max_gpu_memory_mb', 0),
avg_cpu_util=resource_metrics.get('avg_cpu_utilization', 0),
max_cpu_util=resource_metrics.get('max_cpu_utilization', 0),
test_duration=total_time,
total_frames=total_frames,
batch_size=batch_size
)
print(f"{engine.engine_type} 批量推理测试完成 (批次大小: {batch_size}):")
print(f" 平均FPS: {result.avg_fps:.1f}")
print(f" 平均延迟: {result.avg_latency_ms:.1f}ms")
print(f" GPU利用率: {result.avg_gpu_util:.1f}%")
print(f" GPU内存: {result.avg_gpu_memory_mb:.1f}MB")
results.append(result)
return results
def test_concurrent_streams(self, engine: InferenceEngine, concurrent_counts: List[int], test_duration: int = 30) -> List[TestResult]:
"""测试并发流性能"""
results = []
for concurrent_count in concurrent_counts:
print(f"\n🔄 测试 {engine.engine_type} 并发性能 (并发数: {concurrent_count}, {test_duration}秒)...")
# 创建多个摄像头
cameras = [MockCamera() for _ in range(concurrent_count)]
# 共享变量
fps_list = []
latency_list = []
total_frames = 0
threads = []
thread_results = [[] for _ in range(concurrent_count)]
stop_flag = threading.Event()
# 开始资源监控
self.resource_monitor.start_monitoring()
def worker_thread(thread_id: int, camera: MockCamera, results_list: List):
"""工作线程"""
local_fps_list = []
local_latency_list = []
frame_count = 0
last_fps_time = time.time()
fps_frame_count = 0
while not stop_flag.is_set():
try:
# 生成测试帧
frame = camera.generate_frame()
# 推理
result = engine.infer_single(frame)
local_latency_list.append(result['latency_ms'])
frame_count += 1
fps_frame_count += 1
# 每秒计算一次 FPS
current_time = time.time()
if current_time - last_fps_time >= 1.0:
fps = fps_frame_count / (current_time - last_fps_time)
local_fps_list.append(fps)
fps_frame_count = 0
last_fps_time = current_time
except Exception as e:
print(f"线程 {thread_id} 错误: {e}")
break
results_list.extend([{
'fps_list': local_fps_list,
'latency_list': local_latency_list,
'frame_count': frame_count
}])
# 启动工作线程
start_time = time.time()
for i in range(concurrent_count):
thread = threading.Thread(
target=worker_thread,
args=(i, cameras[i], thread_results[i]),
daemon=True
)
threads.append(thread)
thread.start()
# 等待测试完成
time.sleep(test_duration)
stop_flag.set()
# 等待所有线程结束
for thread in threads:
thread.join(timeout=5.0)
# 停止监控
self.resource_monitor.stop_monitoring()
resource_metrics = self.resource_monitor.get_average_metrics()
# 汇总结果
all_fps = []
all_latency = []
total_frames = 0
for thread_result_list in thread_results:
if thread_result_list:
result = thread_result_list[0]
all_fps.extend(result['fps_list'])
all_latency.extend(result['latency_list'])
total_frames += result['frame_count']
total_time = time.time() - start_time
result = TestResult(
engine_type=engine.engine_type,
test_type="concurrent_streams",
avg_fps=np.mean(all_fps) if all_fps else 0,
max_fps=np.max(all_fps) if all_fps else 0,
min_fps=np.min(all_fps) if all_fps else 0,
avg_latency_ms=np.mean(all_latency) if all_latency else 0,
max_latency_ms=np.max(all_latency) if all_latency else 0,
min_latency_ms=np.min(all_latency) if all_latency else 0,
avg_gpu_util=resource_metrics.get('avg_gpu_utilization', 0),
max_gpu_util=resource_metrics.get('max_gpu_utilization', 0),
avg_gpu_memory_mb=resource_metrics.get('avg_gpu_memory_mb', 0),
max_gpu_memory_mb=resource_metrics.get('max_gpu_memory_mb', 0),
avg_cpu_util=resource_metrics.get('avg_cpu_utilization', 0),
max_cpu_util=resource_metrics.get('max_cpu_utilization', 0),
test_duration=total_time,
total_frames=total_frames,
concurrent_streams=concurrent_count
)
print(f"{engine.engine_type} 并发测试完成 (并发数: {concurrent_count}):")
print(f" 总FPS: {result.avg_fps * concurrent_count:.1f}")
print(f" 平均单流FPS: {result.avg_fps:.1f}")
print(f" 平均延迟: {result.avg_latency_ms:.1f}ms")
print(f" GPU利用率: {result.avg_gpu_util:.1f}%")
print(f" GPU内存: {result.avg_gpu_memory_mb:.1f}MB")
results.append(result)
return results
def run_full_benchmark(self) -> Dict:
"""运行完整基准测试"""
print("🚀 开始 YOLOv11 性能对比测试")
print("=" * 60)
all_results = {
'pytorch': {},
'tensorrt': {},
'comparison': {},
'timestamp': datetime.now().isoformat(),
'model_path': self.model_path
}
# 测试配置
batch_sizes = [1, 2, 4, 8]
concurrent_counts = [1, 2, 4, 6, 8, 10]
# 测试 PyTorch
print("\n📊 测试 PyTorch 引擎")
print("-" * 40)
pytorch_engine = PyTorchEngine(self.model_path)
pytorch_engine.load_model()
# PyTorch 单帧推理测试
pytorch_single = self.test_single_inference(pytorch_engine, test_duration=30)
all_results['pytorch']['single_inference'] = asdict(pytorch_single)
# PyTorch 批量推理测试
pytorch_batch = self.test_batch_inference(pytorch_engine, batch_sizes, test_duration=20)
all_results['pytorch']['batch_inference'] = [asdict(r) for r in pytorch_batch]
# PyTorch 并发测试
pytorch_concurrent = self.test_concurrent_streams(pytorch_engine, concurrent_counts, test_duration=30)
all_results['pytorch']['concurrent_streams'] = [asdict(r) for r in pytorch_concurrent]
pytorch_engine.cleanup()
# 测试 TensorRT
print("\n📊 测试 TensorRT 引擎")
print("-" * 40)
try:
tensorrt_engine = TensorRTEngine(self.model_path)
tensorrt_engine.load_model()
# TensorRT 单帧推理测试
tensorrt_single = self.test_single_inference(tensorrt_engine, test_duration=30)
all_results['tensorrt']['single_inference'] = asdict(tensorrt_single)
# TensorRT 批量推理测试
tensorrt_batch = self.test_batch_inference(tensorrt_engine, batch_sizes, test_duration=20)
all_results['tensorrt']['batch_inference'] = [asdict(r) for r in tensorrt_batch]
# TensorRT 并发测试
tensorrt_concurrent = self.test_concurrent_streams(tensorrt_engine, concurrent_counts, test_duration=30)
all_results['tensorrt']['concurrent_streams'] = [asdict(r) for r in tensorrt_concurrent]
tensorrt_engine.cleanup()
# 性能对比分析
all_results['comparison'] = self._analyze_performance_comparison(
pytorch_single, tensorrt_single,
pytorch_batch, tensorrt_batch,
pytorch_concurrent, tensorrt_concurrent
)
except Exception as e:
print(f"❌ TensorRT 测试失败: {e}")
all_results['tensorrt']['error'] = str(e)
return all_results
def _analyze_performance_comparison(self, pytorch_single, tensorrt_single,
pytorch_batch, tensorrt_batch,
pytorch_concurrent, tensorrt_concurrent) -> Dict:
"""分析性能对比"""
comparison = {}
# 单帧推理对比
fps_improvement = (tensorrt_single.avg_fps - pytorch_single.avg_fps) / pytorch_single.avg_fps * 100
latency_improvement = (pytorch_single.avg_latency_ms - tensorrt_single.avg_latency_ms) / pytorch_single.avg_latency_ms * 100
comparison['single_inference'] = {
'fps_improvement_percent': fps_improvement,
'latency_improvement_percent': latency_improvement,
'pytorch_fps': pytorch_single.avg_fps,
'tensorrt_fps': tensorrt_single.avg_fps,
'pytorch_latency_ms': pytorch_single.avg_latency_ms,
'tensorrt_latency_ms': tensorrt_single.avg_latency_ms
}
# 批量推理对比
batch_comparison = []
for pt_batch, trt_batch in zip(pytorch_batch, tensorrt_batch):
fps_imp = (trt_batch.avg_fps - pt_batch.avg_fps) / pt_batch.avg_fps * 100
latency_imp = (pt_batch.avg_latency_ms - trt_batch.avg_latency_ms) / pt_batch.avg_latency_ms * 100
batch_comparison.append({
'batch_size': pt_batch.batch_size,
'fps_improvement_percent': fps_imp,
'latency_improvement_percent': latency_imp,
'pytorch_fps': pt_batch.avg_fps,
'tensorrt_fps': trt_batch.avg_fps
})
comparison['batch_inference'] = batch_comparison
# 并发对比
concurrent_comparison = []
for pt_conc, trt_conc in zip(pytorch_concurrent, tensorrt_concurrent):
fps_imp = (trt_conc.avg_fps - pt_conc.avg_fps) / pt_conc.avg_fps * 100
concurrent_comparison.append({
'concurrent_streams': pt_conc.concurrent_streams,
'fps_improvement_percent': fps_imp,
'pytorch_total_fps': pt_conc.avg_fps * pt_conc.concurrent_streams,
'tensorrt_total_fps': trt_conc.avg_fps * trt_conc.concurrent_streams
})
comparison['concurrent_streams'] = concurrent_comparison
return comparison
def save_results(results: Dict, output_dir: str = "benchmark_results"):
"""保存测试结果"""
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 保存 JSON 结果
json_file = os.path.join(output_dir, f"benchmark_results_{timestamp}.json")
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"✅ 测试结果已保存: {json_file}")
# 生成简要报告
report_file = os.path.join(output_dir, f"benchmark_report_{timestamp}.txt")
with open(report_file, 'w', encoding='utf-8') as f:
f.write("YOLOv11 性能对比测试报告\n")
f.write("=" * 50 + "\n")
f.write(f"测试时间: {results['timestamp']}\n")
f.write(f"模型路径: {results['model_path']}\n\n")
if 'comparison' in results and results['comparison']:
comp = results['comparison']
# 单帧推理对比
if 'single_inference' in comp:
single = comp['single_inference']
f.write("单帧推理性能对比:\n")
f.write(f" PyTorch FPS: {single['pytorch_fps']:.1f}\n")
f.write(f" TensorRT FPS: {single['tensorrt_fps']:.1f}\n")
f.write(f" FPS 提升: {single['fps_improvement_percent']:.1f}%\n")
f.write(f" PyTorch 延迟: {single['pytorch_latency_ms']:.1f}ms\n")
f.write(f" TensorRT 延迟: {single['tensorrt_latency_ms']:.1f}ms\n")
f.write(f" 延迟改善: {single['latency_improvement_percent']:.1f}%\n\n")
# 批量推理对比
if 'batch_inference' in comp:
f.write("批量推理性能对比:\n")
for batch in comp['batch_inference']:
f.write(f" 批次大小 {batch['batch_size']}: TensorRT FPS提升 {batch['fps_improvement_percent']:.1f}%\n")
f.write("\n")
# 并发对比
if 'concurrent_streams' in comp:
f.write("并发性能对比:\n")
for conc in comp['concurrent_streams']:
f.write(f" {conc['concurrent_streams']}路并发: TensorRT总FPS提升 {conc['fps_improvement_percent']:.1f}%\n")
f.write("\n详细数据请查看 JSON 文件。\n")
print(f"✅ 测试报告已保存: {report_file}")
return json_file, report_file
def print_summary(results: Dict):
"""打印测试总结"""
print("\n" + "=" * 60)
print("🎯 性能测试总结")
print("=" * 60)
if 'comparison' in results and results['comparison']:
comp = results['comparison']
# 单帧推理总结
if 'single_inference' in comp:
single = comp['single_inference']
print(f"\n📈 单帧推理性能:")
print(f" PyTorch: {single['pytorch_fps']:.1f} FPS, {single['pytorch_latency_ms']:.1f}ms")
print(f" TensorRT: {single['tensorrt_fps']:.1f} FPS, {single['tensorrt_latency_ms']:.1f}ms")
print(f" 🚀 TensorRT FPS 提升: {single['fps_improvement_percent']:.1f}%")
print(f" ⚡ TensorRT 延迟改善: {single['latency_improvement_percent']:.1f}%")
# 最佳批量推理
if 'batch_inference' in comp and comp['batch_inference']:
best_batch = max(comp['batch_inference'], key=lambda x: x['fps_improvement_percent'])
print(f"\n📦 最佳批量推理 (批次大小 {best_batch['batch_size']}):")
print(f" PyTorch: {best_batch['pytorch_fps']:.1f} FPS")
print(f" TensorRT: {best_batch['tensorrt_fps']:.1f} FPS")
print(f" 🚀 TensorRT FPS 提升: {best_batch['fps_improvement_percent']:.1f}%")
# 最大并发能力
if 'concurrent_streams' in comp and comp['concurrent_streams']:
max_concurrent = comp['concurrent_streams'][-1] # 最后一个通常是最大并发数
print(f"\n🔄 最大并发能力 ({max_concurrent['concurrent_streams']}路):")
print(f" PyTorch 总FPS: {max_concurrent['pytorch_total_fps']:.1f}")
print(f" TensorRT 总FPS: {max_concurrent['tensorrt_total_fps']:.1f}")
print(f" 🚀 TensorRT 总FPS 提升: {max_concurrent['fps_improvement_percent']:.1f}%")
print("\n" + "=" * 60)
def main():
"""主函数"""
print("YOLOv11 性能对比测试系统")
print("PyTorch vs TensorRT 完整性能测试")
print("=" * 60)
# 模型路径
model_path = "C:/Users/16337/PycharmProjects/Security/yolo11n.pt"
if not os.path.exists(model_path):
print(f"❌ 模型文件不存在: {model_path}")
return
# 创建测试器
tester = PerformanceTester(model_path)
try:
# 运行完整基准测试
results = tester.run_full_benchmark()
# 保存结果
json_file, report_file = save_results(results)
# 打印总结
print_summary(results)
print(f"\n📁 结果文件:")
print(f" JSON: {json_file}")
print(f" 报告: {report_file}")
except KeyboardInterrupt:
print("\n\n⏹️ 测试被用户中断")
except Exception as e:
print(f"\n❌ 测试过程中发生错误: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()