#!/usr/bin/env python3 """ 动态批次 TensorRT 性能测试系统 系统性评估不同批次大小下的性能表现 """ import os import sys import time import json import numpy as np import torch import psutil from datetime import datetime from typing import Dict, List, Optional from dataclasses import dataclass, asdict @dataclass class BatchTestResult: """批次测试结果""" batch_size: int avg_fps: float avg_latency_ms: float avg_throughput: float # 每秒处理的图像数 avg_gpu_util: float avg_gpu_memory_mb: float max_gpu_memory_mb: float test_duration: float total_frames: int success: bool error_message: Optional[str] = None class DynamicBatchTester: """动态批次性能测试器""" def __init__(self, engine_path: str): self.engine_path = engine_path self.model = None def load_engine(self): """加载 TensorRT 引擎""" print(f"📦 加载 TensorRT 引擎: {self.engine_path}") if not os.path.exists(self.engine_path): raise FileNotFoundError(f"引擎文件不存在: {self.engine_path}") try: # 尝试使用 TensorRT Python API 加载 import tensorrt as trt logger = trt.Logger(trt.Logger.WARNING) with open(self.engine_path, 'rb') as f: self.trt_runtime = trt.Runtime(logger) self.trt_engine = self.trt_runtime.deserialize_cuda_engine(f.read()) if self.trt_engine is None: raise RuntimeError("TensorRT 引擎加载失败") self.trt_context = self.trt_engine.create_execution_context() self.use_trt_api = True print("✅ 使用 TensorRT Python API 加载引擎") except ImportError: # 回退到 ultralytics from ultralytics import YOLO self.model = YOLO(self.engine_path) self.use_trt_api = False print("✅ 使用 Ultralytics 加载引擎") def warmup(self, batch_size: int, warmup_iterations: int = 10): """预热引擎""" print(f"🔥 预热引擎 (批次大小: {batch_size}, 迭代次数: {warmup_iterations})...") for i in range(warmup_iterations): # 生成随机测试数据 test_images = [np.random.randint(0, 255, (640, 640, 3), dtype=np.uint8) for _ in range(batch_size)] try: if self.use_trt_api: self._infer_trt_api(test_images) else: self.model(test_images, verbose=False) except Exception as e: print(f"⚠️ 预热失败: {e}") return False print("✅ 预热完成") return True def _infer_trt_api(self, images: List[np.ndarray]): """使用 TensorRT API 进行推理""" import tensorrt as trt import pycuda.driver as cuda import pycuda.autoinit batch_size = len(images) # 设置输入形状 input_name = self.trt_engine.get_tensor_name(0) self.trt_context.set_input_shape(input_name, (batch_size, 3, 640, 640)) # 准备输入数据 input_data = np.stack([cv2.resize(img, (640, 640)) for img in images]) input_data = input_data.transpose(0, 3, 1, 2).astype(np.float32) / 255.0 # 分配 GPU 内存 d_input = cuda.mem_alloc(input_data.nbytes) # 获取输出形状 output_shape = self.trt_context.get_tensor_shape(self.trt_engine.get_tensor_name(1)) output_data = np.empty(output_shape, dtype=np.float32) d_output = cuda.mem_alloc(output_data.nbytes) # 复制数据到 GPU cuda.memcpy_htod(d_input, input_data) # 执行推理 self.trt_context.execute_v2([int(d_input), int(d_output)]) # 复制结果回 CPU cuda.memcpy_dtoh(output_data, d_output) return output_data def test_batch_size(self, batch_size: int, test_duration: int = 20) -> BatchTestResult: """测试特定批次大小的性能""" print(f"\n🔄 测试批次大小: {batch_size} (测试时长: {test_duration}秒)") try: # 预热 if not self.warmup(batch_size, warmup_iterations=5): return BatchTestResult( batch_size=batch_size, avg_fps=0, avg_latency_ms=0, avg_throughput=0, avg_gpu_util=0, avg_gpu_memory_mb=0, max_gpu_memory_mb=0, test_duration=0, total_frames=0, success=False, error_message="预热失败" ) # 开始测试 latency_list = [] gpu_memory_list = [] batch_count = 0 start_time = time.time() while time.time() - start_time < test_duration: # 生成测试数据 test_images = [np.random.randint(0, 255, (640, 640, 3), dtype=np.uint8) for _ in range(batch_size)] # 记录 GPU 内存 if torch.cuda.is_available(): gpu_memory_mb = torch.cuda.memory_allocated(0) / 1024 / 1024 gpu_memory_list.append(gpu_memory_mb) # 推理 infer_start = time.time() if self.use_trt_api: self._infer_trt_api(test_images) else: self.model(test_images, verbose=False) infer_end = time.time() # 记录延迟 latency_ms = (infer_end - infer_start) * 1000 latency_list.append(latency_ms) batch_count += 1 # 显示进度 if batch_count % 10 == 0: elapsed = time.time() - start_time current_fps = (batch_count * batch_size) / elapsed print(f" 进度: {elapsed:.1f}s/{test_duration}s, " f"当前吞吐量: {current_fps:.1f} FPS, " f"延迟: {latency_ms:.1f}ms") # 计算结果 total_time = time.time() - start_time total_frames = batch_count * batch_size avg_latency_ms = np.mean(latency_list) avg_throughput = total_frames / total_time avg_fps = avg_throughput # 对于批量推理,FPS = 吞吐量 # GPU 指标 avg_gpu_memory_mb = np.mean(gpu_memory_list) if gpu_memory_list else 0 max_gpu_memory_mb = np.max(gpu_memory_list) if gpu_memory_list else 0 # GPU 利用率(简化计算) try: import GPUtil gpus = GPUtil.getGPUs() avg_gpu_util = gpus[0].load * 100 if gpus else 0 except: avg_gpu_util = 0 result = BatchTestResult( batch_size=batch_size, avg_fps=avg_fps, avg_latency_ms=avg_latency_ms, avg_throughput=avg_throughput, avg_gpu_util=avg_gpu_util, avg_gpu_memory_mb=avg_gpu_memory_mb, max_gpu_memory_mb=max_gpu_memory_mb, test_duration=total_time, total_frames=total_frames, success=True ) print(f"✅ 批次 {batch_size} 测试完成:") print(f" 平均吞吐量: {result.avg_throughput:.1f} FPS") print(f" 平均延迟: {result.avg_latency_ms:.1f}ms") print(f" GPU 内存: {result.avg_gpu_memory_mb:.1f}MB (峰值: {result.max_gpu_memory_mb:.1f}MB)") return result except Exception as e: print(f"❌ 批次 {batch_size} 测试失败: {e}") import traceback traceback.print_exc() return BatchTestResult( batch_size=batch_size, avg_fps=0, avg_latency_ms=0, avg_throughput=0, avg_gpu_util=0, avg_gpu_memory_mb=0, max_gpu_memory_mb=0, test_duration=0, total_frames=0, success=False, error_message=str(e) ) def run_full_batch_test(self, batch_sizes: List[int], test_duration: int = 20) -> Dict: """运行完整的批次性能测试""" print("🚀 开始动态批次性能测试") print("=" * 60) results = { 'engine_path': self.engine_path, 'timestamp': datetime.now().isoformat(), 'batch_tests': [], 'summary': {} } successful_tests = [] for batch_size in batch_sizes: result = self.test_batch_size(batch_size, test_duration) results['batch_tests'].append(asdict(result)) if result.success: successful_tests.append(result) # 生成摘要 if successful_tests: best_throughput = max(successful_tests, key=lambda x: x.avg_throughput) best_latency = min(successful_tests, key=lambda x: x.avg_latency_ms) results['summary'] = { 'total_tests': len(batch_sizes), 'successful_tests': len(successful_tests), 'failed_tests': len(batch_sizes) - len(successful_tests), 'best_throughput': { 'batch_size': best_throughput.batch_size, 'fps': best_throughput.avg_throughput }, 'best_latency': { 'batch_size': best_latency.batch_size, 'latency_ms': best_latency.avg_latency_ms } } return results def save_results(results: Dict, output_dir: str = "batch_test_results"): """保存测试结果""" os.makedirs(output_dir, exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # 保存 JSON 结果 json_file = os.path.join(output_dir, f"batch_test_results_{timestamp}.json") with open(json_file, 'w', encoding='utf-8') as f: json.dump(results, f, indent=2, ensure_ascii=False) print(f"\n✅ 测试结果已保存: {json_file}") # 生成文本报告 report_file = os.path.join(output_dir, f"batch_test_report_{timestamp}.txt") with open(report_file, 'w', encoding='utf-8') as f: f.write("动态批次 TensorRT 性能测试报告\n") f.write("=" * 60 + "\n") f.write(f"测试时间: {results['timestamp']}\n") f.write(f"引擎路径: {results['engine_path']}\n\n") f.write("批次性能测试结果:\n") f.write("-" * 60 + "\n") for test in results['batch_tests']: if test['success']: f.write(f"\n批次大小: {test['batch_size']}\n") f.write(f" 平均吞吐量: {test['avg_throughput']:.1f} FPS\n") f.write(f" 平均延迟: {test['avg_latency_ms']:.1f}ms\n") f.write(f" GPU 利用率: {test['avg_gpu_util']:.1f}%\n") f.write(f" GPU 内存: {test['avg_gpu_memory_mb']:.1f}MB (峰值: {test['max_gpu_memory_mb']:.1f}MB)\n") f.write(f" 测试时长: {test['test_duration']:.1f}s\n") f.write(f" 总帧数: {test['total_frames']}\n") else: f.write(f"\n批次大小: {test['batch_size']} - 失败\n") f.write(f" 错误信息: {test['error_message']}\n") if 'summary' in results and results['summary']: summary = results['summary'] f.write(f"\n\n测试摘要:\n") f.write("=" * 60 + "\n") f.write(f"总测试数: {summary['total_tests']}\n") f.write(f"成功测试: {summary['successful_tests']}\n") f.write(f"失败测试: {summary['failed_tests']}\n") if 'best_throughput' in summary: f.write(f"\n最佳吞吐量:\n") f.write(f" 批次大小: {summary['best_throughput']['batch_size']}\n") f.write(f" 吞吐量: {summary['best_throughput']['fps']:.1f} FPS\n") if 'best_latency' in summary: f.write(f"\n最低延迟:\n") f.write(f" 批次大小: {summary['best_latency']['batch_size']}\n") f.write(f" 延迟: {summary['best_latency']['latency_ms']:.1f}ms\n") print(f"✅ 测试报告已保存: {report_file}") return json_file, report_file def main(): """主函数""" print("动态批次 TensorRT 性能测试系统") print("=" * 60) # 引擎路径 engine_path = "C:/Users/16337/PycharmProjects/Security/yolo11n_dynamic.engine" # 检查引擎文件 if not os.path.exists(engine_path): print(f"❌ TensorRT 引擎不存在: {engine_path}") print("请先运行 dynamic_batch_tensorrt_builder.py 构建动态批次引擎") return # 检查 CUDA if not torch.cuda.is_available(): print("❌ CUDA 不可用") return print(f"✅ CUDA 可用,设备: {torch.cuda.get_device_name(0)}") try: # 创建测试器 tester = DynamicBatchTester(engine_path) tester.load_engine() # 测试批次大小列表 batch_sizes = [1, 2, 4, 8, 16, 32] test_duration = 20 # 每个批次测试 20 秒 print(f"\n📊 测试配置:") print(f" 批次大小: {batch_sizes}") print(f" 每批次测试时长: {test_duration}秒") # 运行完整测试 results = tester.run_full_batch_test(batch_sizes, test_duration) # 保存结果 json_file, report_file = save_results(results) # 打印摘要 if 'summary' in results and results['summary']: summary = results['summary'] print(f"\n🎯 测试摘要:") print(f" 成功: {summary['successful_tests']}/{summary['total_tests']}") if 'best_throughput' in summary: print(f" 最佳吞吐量: 批次 {summary['best_throughput']['batch_size']} " f"({summary['best_throughput']['fps']:.1f} FPS)") if 'best_latency' in summary: print(f" 最低延迟: 批次 {summary['best_latency']['batch_size']} " f"({summary['best_latency']['latency_ms']:.1f}ms)") print(f"\n📁 结果文件:") print(f" JSON: {json_file}") print(f" 报告: {report_file}") print(f"\n🎨 生成可视化图表:") print(f" 运行命令: python visualize_batch_results.py") except KeyboardInterrupt: print("\n\n⏹️ 测试被用户中断") except Exception as e: print(f"\n❌ 测试过程中发生错误: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()