#!/usr/bin/env python3 """ YOLOv11 性能对比测试系统 PyTorch vs TensorRT 完整性能测试 """ import os import sys import time import json import threading import numpy as np import cv2 import torch import psutil from pathlib import Path from datetime import datetime from typing import Dict, List, Tuple, Optional from dataclasses import dataclass, asdict from ultralytics import YOLO # 性能指标数据类 @dataclass class PerformanceMetrics: timestamp: float engine_type: str fps: Optional[float] = None latency_ms: Optional[float] = None gpu_utilization: Optional[float] = None gpu_memory_mb: Optional[float] = None cpu_utilization: Optional[float] = None memory_mb: Optional[float] = None concurrent_streams: Optional[int] = None batch_size: Optional[int] = None @dataclass class TestResult: engine_type: str test_type: str avg_fps: float max_fps: float min_fps: float avg_latency_ms: float max_latency_ms: float min_latency_ms: float avg_gpu_util: float max_gpu_util: float avg_gpu_memory_mb: float max_gpu_memory_mb: float avg_cpu_util: float max_cpu_util: float test_duration: float total_frames: int concurrent_streams: int = 1 batch_size: int = 1 class ResourceMonitor: """系统资源监控器""" def __init__(self, sampling_interval: float = 0.1): self.sampling_interval = sampling_interval self.is_monitoring = False self.metrics_history = [] self.monitor_thread = None def start_monitoring(self): """开始监控""" self.is_monitoring = True self.metrics_history = [] self.monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True) self.monitor_thread.start() def stop_monitoring(self): """停止监控""" self.is_monitoring = False if self.monitor_thread: self.monitor_thread.join(timeout=1.0) def _monitor_loop(self): """监控循环""" try: import GPUtil except ImportError: print("警告: GPUtil 未安装,GPU 监控不可用") GPUtil = None while self.is_monitoring: try: # CPU 和内存监控 cpu_util = psutil.cpu_percent(interval=None) memory_info = psutil.virtual_memory() memory_mb = memory_info.used / 1024 / 1024 # GPU 监控 gpu_util = None gpu_memory_mb = None if GPUtil and torch.cuda.is_available(): try: gpus = GPUtil.getGPUs() if gpus: gpu = gpus[0] gpu_util = gpu.load * 100 gpu_memory_mb = gpu.memoryUsed except: pass # 使用 torch 获取 GPU 信息作为备选 if gpu_util is None and torch.cuda.is_available(): try: gpu_memory_mb = torch.cuda.memory_allocated(0) / 1024 / 1024 # GPU 利用率通过 torch 较难获取,使用占位符 gpu_util = 0.0 except: pass metrics = { 'timestamp': time.time(), 'cpu_utilization': cpu_util, 'memory_mb': memory_mb, 'gpu_utilization': gpu_util, 'gpu_memory_mb': gpu_memory_mb } self.metrics_history.append(metrics) except Exception as e: print(f"监控错误: {e}") time.sleep(self.sampling_interval) def get_average_metrics(self) -> Dict: """获取平均指标""" if not self.metrics_history: return {} metrics = {} for key in ['cpu_utilization', 'memory_mb', 'gpu_utilization', 'gpu_memory_mb']: values = [m[key] for m in self.metrics_history if m[key] is not None] if values: metrics[f'avg_{key}'] = np.mean(values) metrics[f'max_{key}'] = np.max(values) metrics[f'min_{key}'] = np.min(values) return metrics class MockCamera: """模拟摄像头""" def __init__(self, width: int = 640, height: int = 640, fps: int = 30): self.width = width self.height = height self.fps = fps self.frame_count = 0 def generate_frame(self) -> np.ndarray: """生成模拟帧""" # 生成随机图像 frame = np.random.randint(0, 255, (self.height, self.width, 3), dtype=np.uint8) # 添加一些简单的几何形状模拟目标 if self.frame_count % 10 < 5: # 50% 概率有目标 # 添加矩形模拟人员 x1, y1 = np.random.randint(50, self.width-100), np.random.randint(50, self.height-150) x2, y2 = x1 + 50, y1 + 100 cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 255, 255), -1) self.frame_count += 1 return frame def generate_batch(self, batch_size: int) -> List[np.ndarray]: """生成批量帧""" return [self.generate_frame() for _ in range(batch_size)] class InferenceEngine: """推理引擎基类""" def __init__(self, model_path: str, engine_type: str): self.model_path = model_path self.engine_type = engine_type self.model = None self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') def load_model(self): """加载模型""" raise NotImplementedError def infer_single(self, image: np.ndarray) -> Dict: """单帧推理""" raise NotImplementedError def infer_batch(self, images: List[np.ndarray]) -> List[Dict]: """批量推理""" raise NotImplementedError def cleanup(self): """清理资源""" if hasattr(self, 'model') and self.model: del self.model if torch.cuda.is_available(): torch.cuda.empty_cache() class PyTorchEngine(InferenceEngine): """PyTorch 推理引擎""" def __init__(self, model_path: str): super().__init__(model_path, "pytorch") def load_model(self): """加载 PyTorch 模型""" print(f"加载 PyTorch 模型: {self.model_path}") self.model = YOLO(self.model_path) self.model.to(self.device) print(f"✅ PyTorch 模型加载完成,设备: {self.device}") def infer_single(self, image: np.ndarray) -> Dict: """单帧推理""" start_time = time.time() results = self.model(image, verbose=False, device=self.device) end_time = time.time() latency_ms = (end_time - start_time) * 1000 return { 'latency_ms': latency_ms, 'detections': len(results[0].boxes) if results[0].boxes is not None else 0 } def infer_batch(self, images: List[np.ndarray]) -> List[Dict]: """批量推理""" start_time = time.time() results = self.model(images, verbose=False, device=self.device) end_time = time.time() total_latency_ms = (end_time - start_time) * 1000 avg_latency_ms = total_latency_ms / len(images) return [{ 'latency_ms': avg_latency_ms, 'detections': len(result.boxes) if result.boxes is not None else 0 } for result in results] class TensorRTEngine(InferenceEngine): """TensorRT 推理引擎""" def __init__(self, model_path: str): super().__init__(model_path, "tensorrt") self.engine_path = None def load_model(self): """加载或创建 TensorRT 模型""" # 检查是否已有 TensorRT 引擎文件 engine_path = self.model_path.replace('.pt', '.engine') if os.path.exists(engine_path): print(f"找到现有 TensorRT 引擎: {engine_path}") self.engine_path = engine_path else: print(f"创建 TensorRT 引擎: {self.model_path} -> {engine_path}") self._export_tensorrt_engine(engine_path) # 加载 TensorRT 引擎 self.model = YOLO(self.engine_path) print(f"✅ TensorRT 模型加载完成") def _export_tensorrt_engine(self, engine_path: str): """导出 TensorRT 引擎""" print("正在导出 TensorRT 引擎,这可能需要几分钟...") # 加载原始模型 model = YOLO(self.model_path) # 导出为 TensorRT try: exported_model = model.export( format='engine', imgsz=640, device=0 if torch.cuda.is_available() else 'cpu', half=True, # FP16 dynamic=False, simplify=True, workspace=4, # GB verbose=True ) self.engine_path = exported_model print(f"✅ TensorRT 引擎导出完成: {self.engine_path}") except Exception as e: print(f"❌ TensorRT 引擎导出失败: {e}") raise def infer_single(self, image: np.ndarray) -> Dict: """单帧推理""" start_time = time.time() results = self.model(image, verbose=False) end_time = time.time() latency_ms = (end_time - start_time) * 1000 return { 'latency_ms': latency_ms, 'detections': len(results[0].boxes) if results[0].boxes is not None else 0 } def infer_batch(self, images: List[np.ndarray]) -> List[Dict]: """批量推理""" start_time = time.time() results = self.model(images, verbose=False) end_time = time.time() total_latency_ms = (end_time - start_time) * 1000 avg_latency_ms = total_latency_ms / len(images) return [{ 'latency_ms': avg_latency_ms, 'detections': len(result.boxes) if result.boxes is not None else 0 } for result in results] class PerformanceTester: """性能测试器""" def __init__(self, model_path: str): self.model_path = model_path self.results = [] self.resource_monitor = ResourceMonitor() def test_single_inference(self, engine: InferenceEngine, test_duration: int = 30) -> TestResult: """测试单帧推理性能""" print(f"\n🔄 测试 {engine.engine_type} 单帧推理性能 ({test_duration}秒)...") camera = MockCamera() fps_list = [] latency_list = [] frame_count = 0 # 开始资源监控 self.resource_monitor.start_monitoring() start_time = time.time() last_fps_time = start_time fps_frame_count = 0 while time.time() - start_time < test_duration: # 生成测试帧 frame = camera.generate_frame() # 推理 result = engine.infer_single(frame) latency_list.append(result['latency_ms']) frame_count += 1 fps_frame_count += 1 # 每秒计算一次 FPS current_time = time.time() if current_time - last_fps_time >= 1.0: fps = fps_frame_count / (current_time - last_fps_time) fps_list.append(fps) fps_frame_count = 0 last_fps_time = current_time # 显示进度 elapsed = current_time - start_time print(f" 进度: {elapsed:.1f}s/{test_duration}s, 当前FPS: {fps:.1f}, 延迟: {result['latency_ms']:.1f}ms") # 停止监控 self.resource_monitor.stop_monitoring() resource_metrics = self.resource_monitor.get_average_metrics() # 计算结果 total_time = time.time() - start_time result = TestResult( engine_type=engine.engine_type, test_type="single_inference", avg_fps=np.mean(fps_list) if fps_list else 0, max_fps=np.max(fps_list) if fps_list else 0, min_fps=np.min(fps_list) if fps_list else 0, avg_latency_ms=np.mean(latency_list), max_latency_ms=np.max(latency_list), min_latency_ms=np.min(latency_list), avg_gpu_util=resource_metrics.get('avg_gpu_utilization', 0), max_gpu_util=resource_metrics.get('max_gpu_utilization', 0), avg_gpu_memory_mb=resource_metrics.get('avg_gpu_memory_mb', 0), max_gpu_memory_mb=resource_metrics.get('max_gpu_memory_mb', 0), avg_cpu_util=resource_metrics.get('avg_cpu_utilization', 0), max_cpu_util=resource_metrics.get('max_cpu_utilization', 0), test_duration=total_time, total_frames=frame_count ) print(f"✅ {engine.engine_type} 单帧推理测试完成:") print(f" 平均FPS: {result.avg_fps:.1f}") print(f" 平均延迟: {result.avg_latency_ms:.1f}ms") print(f" GPU利用率: {result.avg_gpu_util:.1f}%") print(f" GPU内存: {result.avg_gpu_memory_mb:.1f}MB") return result def test_batch_inference(self, engine: InferenceEngine, batch_sizes: List[int], test_duration: int = 20) -> List[TestResult]: """测试批量推理性能""" results = [] for batch_size in batch_sizes: print(f"\n🔄 测试 {engine.engine_type} 批量推理性能 (批次大小: {batch_size}, {test_duration}秒)...") camera = MockCamera() fps_list = [] latency_list = [] batch_count = 0 # 开始资源监控 self.resource_monitor.start_monitoring() start_time = time.time() last_fps_time = start_time fps_batch_count = 0 while time.time() - start_time < test_duration: # 生成批量测试帧 batch_frames = camera.generate_batch(batch_size) # 批量推理 batch_results = engine.infer_batch(batch_frames) avg_latency = np.mean([r['latency_ms'] for r in batch_results]) latency_list.append(avg_latency) batch_count += 1 fps_batch_count += 1 # 每秒计算一次 FPS current_time = time.time() if current_time - last_fps_time >= 1.0: # 批量FPS = 批次数 * 批次大小 / 时间 fps = (fps_batch_count * batch_size) / (current_time - last_fps_time) fps_list.append(fps) fps_batch_count = 0 last_fps_time = current_time # 显示进度 elapsed = current_time - start_time print(f" 进度: {elapsed:.1f}s/{test_duration}s, 当前FPS: {fps:.1f}, 延迟: {avg_latency:.1f}ms") # 停止监控 self.resource_monitor.stop_monitoring() resource_metrics = self.resource_monitor.get_average_metrics() # 计算结果 total_time = time.time() - start_time total_frames = batch_count * batch_size result = TestResult( engine_type=engine.engine_type, test_type="batch_inference", avg_fps=np.mean(fps_list) if fps_list else 0, max_fps=np.max(fps_list) if fps_list else 0, min_fps=np.min(fps_list) if fps_list else 0, avg_latency_ms=np.mean(latency_list), max_latency_ms=np.max(latency_list), min_latency_ms=np.min(latency_list), avg_gpu_util=resource_metrics.get('avg_gpu_utilization', 0), max_gpu_util=resource_metrics.get('max_gpu_utilization', 0), avg_gpu_memory_mb=resource_metrics.get('avg_gpu_memory_mb', 0), max_gpu_memory_mb=resource_metrics.get('max_gpu_memory_mb', 0), avg_cpu_util=resource_metrics.get('avg_cpu_utilization', 0), max_cpu_util=resource_metrics.get('max_cpu_utilization', 0), test_duration=total_time, total_frames=total_frames, batch_size=batch_size ) print(f"✅ {engine.engine_type} 批量推理测试完成 (批次大小: {batch_size}):") print(f" 平均FPS: {result.avg_fps:.1f}") print(f" 平均延迟: {result.avg_latency_ms:.1f}ms") print(f" GPU利用率: {result.avg_gpu_util:.1f}%") print(f" GPU内存: {result.avg_gpu_memory_mb:.1f}MB") results.append(result) return results def test_concurrent_streams(self, engine: InferenceEngine, concurrent_counts: List[int], test_duration: int = 30) -> List[TestResult]: """测试并发流性能""" results = [] for concurrent_count in concurrent_counts: print(f"\n🔄 测试 {engine.engine_type} 并发性能 (并发数: {concurrent_count}, {test_duration}秒)...") # 创建多个摄像头 cameras = [MockCamera() for _ in range(concurrent_count)] # 共享变量 fps_list = [] latency_list = [] total_frames = 0 threads = [] thread_results = [[] for _ in range(concurrent_count)] stop_flag = threading.Event() # 开始资源监控 self.resource_monitor.start_monitoring() def worker_thread(thread_id: int, camera: MockCamera, results_list: List): """工作线程""" local_fps_list = [] local_latency_list = [] frame_count = 0 last_fps_time = time.time() fps_frame_count = 0 while not stop_flag.is_set(): try: # 生成测试帧 frame = camera.generate_frame() # 推理 result = engine.infer_single(frame) local_latency_list.append(result['latency_ms']) frame_count += 1 fps_frame_count += 1 # 每秒计算一次 FPS current_time = time.time() if current_time - last_fps_time >= 1.0: fps = fps_frame_count / (current_time - last_fps_time) local_fps_list.append(fps) fps_frame_count = 0 last_fps_time = current_time except Exception as e: print(f"线程 {thread_id} 错误: {e}") break results_list.extend([{ 'fps_list': local_fps_list, 'latency_list': local_latency_list, 'frame_count': frame_count }]) # 启动工作线程 start_time = time.time() for i in range(concurrent_count): thread = threading.Thread( target=worker_thread, args=(i, cameras[i], thread_results[i]), daemon=True ) threads.append(thread) thread.start() # 等待测试完成 time.sleep(test_duration) stop_flag.set() # 等待所有线程结束 for thread in threads: thread.join(timeout=5.0) # 停止监控 self.resource_monitor.stop_monitoring() resource_metrics = self.resource_monitor.get_average_metrics() # 汇总结果 all_fps = [] all_latency = [] total_frames = 0 for thread_result_list in thread_results: if thread_result_list: result = thread_result_list[0] all_fps.extend(result['fps_list']) all_latency.extend(result['latency_list']) total_frames += result['frame_count'] total_time = time.time() - start_time result = TestResult( engine_type=engine.engine_type, test_type="concurrent_streams", avg_fps=np.mean(all_fps) if all_fps else 0, max_fps=np.max(all_fps) if all_fps else 0, min_fps=np.min(all_fps) if all_fps else 0, avg_latency_ms=np.mean(all_latency) if all_latency else 0, max_latency_ms=np.max(all_latency) if all_latency else 0, min_latency_ms=np.min(all_latency) if all_latency else 0, avg_gpu_util=resource_metrics.get('avg_gpu_utilization', 0), max_gpu_util=resource_metrics.get('max_gpu_utilization', 0), avg_gpu_memory_mb=resource_metrics.get('avg_gpu_memory_mb', 0), max_gpu_memory_mb=resource_metrics.get('max_gpu_memory_mb', 0), avg_cpu_util=resource_metrics.get('avg_cpu_utilization', 0), max_cpu_util=resource_metrics.get('max_cpu_utilization', 0), test_duration=total_time, total_frames=total_frames, concurrent_streams=concurrent_count ) print(f"✅ {engine.engine_type} 并发测试完成 (并发数: {concurrent_count}):") print(f" 总FPS: {result.avg_fps * concurrent_count:.1f}") print(f" 平均单流FPS: {result.avg_fps:.1f}") print(f" 平均延迟: {result.avg_latency_ms:.1f}ms") print(f" GPU利用率: {result.avg_gpu_util:.1f}%") print(f" GPU内存: {result.avg_gpu_memory_mb:.1f}MB") results.append(result) return results def run_full_benchmark(self) -> Dict: """运行完整基准测试""" print("🚀 开始 YOLOv11 性能对比测试") print("=" * 60) all_results = { 'pytorch': {}, 'tensorrt': {}, 'comparison': {}, 'timestamp': datetime.now().isoformat(), 'model_path': self.model_path } # 测试配置 batch_sizes = [1, 2, 4, 8] concurrent_counts = [1, 2, 4, 6, 8, 10] # 测试 PyTorch print("\n📊 测试 PyTorch 引擎") print("-" * 40) pytorch_engine = PyTorchEngine(self.model_path) pytorch_engine.load_model() # PyTorch 单帧推理测试 pytorch_single = self.test_single_inference(pytorch_engine, test_duration=30) all_results['pytorch']['single_inference'] = asdict(pytorch_single) # PyTorch 批量推理测试 pytorch_batch = self.test_batch_inference(pytorch_engine, batch_sizes, test_duration=20) all_results['pytorch']['batch_inference'] = [asdict(r) for r in pytorch_batch] # PyTorch 并发测试 pytorch_concurrent = self.test_concurrent_streams(pytorch_engine, concurrent_counts, test_duration=30) all_results['pytorch']['concurrent_streams'] = [asdict(r) for r in pytorch_concurrent] pytorch_engine.cleanup() # 测试 TensorRT print("\n📊 测试 TensorRT 引擎") print("-" * 40) try: tensorrt_engine = TensorRTEngine(self.model_path) tensorrt_engine.load_model() # TensorRT 单帧推理测试 tensorrt_single = self.test_single_inference(tensorrt_engine, test_duration=30) all_results['tensorrt']['single_inference'] = asdict(tensorrt_single) # TensorRT 批量推理测试 tensorrt_batch = self.test_batch_inference(tensorrt_engine, batch_sizes, test_duration=20) all_results['tensorrt']['batch_inference'] = [asdict(r) for r in tensorrt_batch] # TensorRT 并发测试 tensorrt_concurrent = self.test_concurrent_streams(tensorrt_engine, concurrent_counts, test_duration=30) all_results['tensorrt']['concurrent_streams'] = [asdict(r) for r in tensorrt_concurrent] tensorrt_engine.cleanup() # 性能对比分析 all_results['comparison'] = self._analyze_performance_comparison( pytorch_single, tensorrt_single, pytorch_batch, tensorrt_batch, pytorch_concurrent, tensorrt_concurrent ) except Exception as e: print(f"❌ TensorRT 测试失败: {e}") all_results['tensorrt']['error'] = str(e) return all_results def _analyze_performance_comparison(self, pytorch_single, tensorrt_single, pytorch_batch, tensorrt_batch, pytorch_concurrent, tensorrt_concurrent) -> Dict: """分析性能对比""" comparison = {} # 单帧推理对比 fps_improvement = (tensorrt_single.avg_fps - pytorch_single.avg_fps) / pytorch_single.avg_fps * 100 latency_improvement = (pytorch_single.avg_latency_ms - tensorrt_single.avg_latency_ms) / pytorch_single.avg_latency_ms * 100 comparison['single_inference'] = { 'fps_improvement_percent': fps_improvement, 'latency_improvement_percent': latency_improvement, 'pytorch_fps': pytorch_single.avg_fps, 'tensorrt_fps': tensorrt_single.avg_fps, 'pytorch_latency_ms': pytorch_single.avg_latency_ms, 'tensorrt_latency_ms': tensorrt_single.avg_latency_ms } # 批量推理对比 batch_comparison = [] for pt_batch, trt_batch in zip(pytorch_batch, tensorrt_batch): fps_imp = (trt_batch.avg_fps - pt_batch.avg_fps) / pt_batch.avg_fps * 100 latency_imp = (pt_batch.avg_latency_ms - trt_batch.avg_latency_ms) / pt_batch.avg_latency_ms * 100 batch_comparison.append({ 'batch_size': pt_batch.batch_size, 'fps_improvement_percent': fps_imp, 'latency_improvement_percent': latency_imp, 'pytorch_fps': pt_batch.avg_fps, 'tensorrt_fps': trt_batch.avg_fps }) comparison['batch_inference'] = batch_comparison # 并发对比 concurrent_comparison = [] for pt_conc, trt_conc in zip(pytorch_concurrent, tensorrt_concurrent): fps_imp = (trt_conc.avg_fps - pt_conc.avg_fps) / pt_conc.avg_fps * 100 concurrent_comparison.append({ 'concurrent_streams': pt_conc.concurrent_streams, 'fps_improvement_percent': fps_imp, 'pytorch_total_fps': pt_conc.avg_fps * pt_conc.concurrent_streams, 'tensorrt_total_fps': trt_conc.avg_fps * trt_conc.concurrent_streams }) comparison['concurrent_streams'] = concurrent_comparison return comparison def save_results(results: Dict, output_dir: str = "benchmark_results"): """保存测试结果""" os.makedirs(output_dir, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # 保存 JSON 结果 json_file = os.path.join(output_dir, f"benchmark_results_{timestamp}.json") with open(json_file, 'w', encoding='utf-8') as f: json.dump(results, f, indent=2, ensure_ascii=False) print(f"✅ 测试结果已保存: {json_file}") # 生成简要报告 report_file = os.path.join(output_dir, f"benchmark_report_{timestamp}.txt") with open(report_file, 'w', encoding='utf-8') as f: f.write("YOLOv11 性能对比测试报告\n") f.write("=" * 50 + "\n") f.write(f"测试时间: {results['timestamp']}\n") f.write(f"模型路径: {results['model_path']}\n\n") if 'comparison' in results and results['comparison']: comp = results['comparison'] # 单帧推理对比 if 'single_inference' in comp: single = comp['single_inference'] f.write("单帧推理性能对比:\n") f.write(f" PyTorch FPS: {single['pytorch_fps']:.1f}\n") f.write(f" TensorRT FPS: {single['tensorrt_fps']:.1f}\n") f.write(f" FPS 提升: {single['fps_improvement_percent']:.1f}%\n") f.write(f" PyTorch 延迟: {single['pytorch_latency_ms']:.1f}ms\n") f.write(f" TensorRT 延迟: {single['tensorrt_latency_ms']:.1f}ms\n") f.write(f" 延迟改善: {single['latency_improvement_percent']:.1f}%\n\n") # 批量推理对比 if 'batch_inference' in comp: f.write("批量推理性能对比:\n") for batch in comp['batch_inference']: f.write(f" 批次大小 {batch['batch_size']}: TensorRT FPS提升 {batch['fps_improvement_percent']:.1f}%\n") f.write("\n") # 并发对比 if 'concurrent_streams' in comp: f.write("并发性能对比:\n") for conc in comp['concurrent_streams']: f.write(f" {conc['concurrent_streams']}路并发: TensorRT总FPS提升 {conc['fps_improvement_percent']:.1f}%\n") f.write("\n详细数据请查看 JSON 文件。\n") print(f"✅ 测试报告已保存: {report_file}") return json_file, report_file def print_summary(results: Dict): """打印测试总结""" print("\n" + "=" * 60) print("🎯 性能测试总结") print("=" * 60) if 'comparison' in results and results['comparison']: comp = results['comparison'] # 单帧推理总结 if 'single_inference' in comp: single = comp['single_inference'] print(f"\n📈 单帧推理性能:") print(f" PyTorch: {single['pytorch_fps']:.1f} FPS, {single['pytorch_latency_ms']:.1f}ms") print(f" TensorRT: {single['tensorrt_fps']:.1f} FPS, {single['tensorrt_latency_ms']:.1f}ms") print(f" 🚀 TensorRT FPS 提升: {single['fps_improvement_percent']:.1f}%") print(f" ⚡ TensorRT 延迟改善: {single['latency_improvement_percent']:.1f}%") # 最佳批量推理 if 'batch_inference' in comp and comp['batch_inference']: best_batch = max(comp['batch_inference'], key=lambda x: x['fps_improvement_percent']) print(f"\n📦 最佳批量推理 (批次大小 {best_batch['batch_size']}):") print(f" PyTorch: {best_batch['pytorch_fps']:.1f} FPS") print(f" TensorRT: {best_batch['tensorrt_fps']:.1f} FPS") print(f" 🚀 TensorRT FPS 提升: {best_batch['fps_improvement_percent']:.1f}%") # 最大并发能力 if 'concurrent_streams' in comp and comp['concurrent_streams']: max_concurrent = comp['concurrent_streams'][-1] # 最后一个通常是最大并发数 print(f"\n🔄 最大并发能力 ({max_concurrent['concurrent_streams']}路):") print(f" PyTorch 总FPS: {max_concurrent['pytorch_total_fps']:.1f}") print(f" TensorRT 总FPS: {max_concurrent['tensorrt_total_fps']:.1f}") print(f" 🚀 TensorRT 总FPS 提升: {max_concurrent['fps_improvement_percent']:.1f}%") print("\n" + "=" * 60) def main(): """主函数""" print("YOLOv11 性能对比测试系统") print("PyTorch vs TensorRT 完整性能测试") print("=" * 60) # 模型路径 model_path = "C:/Users/16337/PycharmProjects/Security/yolo11n.pt" if not os.path.exists(model_path): print(f"❌ 模型文件不存在: {model_path}") return # 创建测试器 tester = PerformanceTester(model_path) try: # 运行完整基准测试 results = tester.run_full_benchmark() # 保存结果 json_file, report_file = save_results(results) # 打印总结 print_summary(results) print(f"\n📁 结果文件:") print(f" JSON: {json_file}") print(f" 报告: {report_file}") except KeyboardInterrupt: print("\n\n⏹️ 测试被用户中断") except Exception as e: print(f"\n❌ 测试过程中发生错误: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()