409 lines
15 KiB
Python
409 lines
15 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
动态批次 TensorRT 性能测试系统
|
||
系统性评估不同批次大小下的性能表现
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import time
|
||
import json
|
||
import numpy as np
|
||
import torch
|
||
import psutil
|
||
from datetime import datetime
|
||
from typing import Dict, List, Optional
|
||
from dataclasses import dataclass, asdict
|
||
|
||
@dataclass
|
||
class BatchTestResult:
|
||
"""批次测试结果"""
|
||
batch_size: int
|
||
avg_fps: float
|
||
avg_latency_ms: float
|
||
avg_throughput: float # 每秒处理的图像数
|
||
avg_gpu_util: float
|
||
avg_gpu_memory_mb: float
|
||
max_gpu_memory_mb: float
|
||
test_duration: float
|
||
total_frames: int
|
||
success: bool
|
||
error_message: Optional[str] = None
|
||
|
||
class DynamicBatchTester:
|
||
"""动态批次性能测试器"""
|
||
|
||
def __init__(self, engine_path: str):
|
||
self.engine_path = engine_path
|
||
self.model = None
|
||
|
||
def load_engine(self):
|
||
"""加载 TensorRT 引擎"""
|
||
print(f"📦 加载 TensorRT 引擎: {self.engine_path}")
|
||
|
||
if not os.path.exists(self.engine_path):
|
||
raise FileNotFoundError(f"引擎文件不存在: {self.engine_path}")
|
||
|
||
try:
|
||
# 尝试使用 TensorRT Python API 加载
|
||
import tensorrt as trt
|
||
|
||
logger = trt.Logger(trt.Logger.WARNING)
|
||
with open(self.engine_path, 'rb') as f:
|
||
self.trt_runtime = trt.Runtime(logger)
|
||
self.trt_engine = self.trt_runtime.deserialize_cuda_engine(f.read())
|
||
|
||
if self.trt_engine is None:
|
||
raise RuntimeError("TensorRT 引擎加载失败")
|
||
|
||
self.trt_context = self.trt_engine.create_execution_context()
|
||
self.use_trt_api = True
|
||
|
||
print("✅ 使用 TensorRT Python API 加载引擎")
|
||
|
||
except ImportError:
|
||
# 回退到 ultralytics
|
||
from ultralytics import YOLO
|
||
self.model = YOLO(self.engine_path)
|
||
self.use_trt_api = False
|
||
print("✅ 使用 Ultralytics 加载引擎")
|
||
|
||
def warmup(self, batch_size: int, warmup_iterations: int = 10):
|
||
"""预热引擎"""
|
||
print(f"🔥 预热引擎 (批次大小: {batch_size}, 迭代次数: {warmup_iterations})...")
|
||
|
||
for i in range(warmup_iterations):
|
||
# 生成随机测试数据
|
||
test_images = [np.random.randint(0, 255, (640, 640, 3), dtype=np.uint8)
|
||
for _ in range(batch_size)]
|
||
|
||
try:
|
||
if self.use_trt_api:
|
||
self._infer_trt_api(test_images)
|
||
else:
|
||
self.model(test_images, verbose=False)
|
||
except Exception as e:
|
||
print(f"⚠️ 预热失败: {e}")
|
||
return False
|
||
|
||
print("✅ 预热完成")
|
||
return True
|
||
|
||
def _infer_trt_api(self, images: List[np.ndarray]):
|
||
"""使用 TensorRT API 进行推理"""
|
||
import tensorrt as trt
|
||
import pycuda.driver as cuda
|
||
import pycuda.autoinit
|
||
|
||
batch_size = len(images)
|
||
|
||
# 设置输入形状
|
||
input_name = self.trt_engine.get_tensor_name(0)
|
||
self.trt_context.set_input_shape(input_name, (batch_size, 3, 640, 640))
|
||
|
||
# 准备输入数据
|
||
input_data = np.stack([cv2.resize(img, (640, 640)) for img in images])
|
||
input_data = input_data.transpose(0, 3, 1, 2).astype(np.float32) / 255.0
|
||
|
||
# 分配 GPU 内存
|
||
d_input = cuda.mem_alloc(input_data.nbytes)
|
||
|
||
# 获取输出形状
|
||
output_shape = self.trt_context.get_tensor_shape(self.trt_engine.get_tensor_name(1))
|
||
output_data = np.empty(output_shape, dtype=np.float32)
|
||
d_output = cuda.mem_alloc(output_data.nbytes)
|
||
|
||
# 复制数据到 GPU
|
||
cuda.memcpy_htod(d_input, input_data)
|
||
|
||
# 执行推理
|
||
self.trt_context.execute_v2([int(d_input), int(d_output)])
|
||
|
||
# 复制结果回 CPU
|
||
cuda.memcpy_dtoh(output_data, d_output)
|
||
|
||
return output_data
|
||
|
||
def test_batch_size(self, batch_size: int, test_duration: int = 20) -> BatchTestResult:
|
||
"""测试特定批次大小的性能"""
|
||
print(f"\n🔄 测试批次大小: {batch_size} (测试时长: {test_duration}秒)")
|
||
|
||
try:
|
||
# 预热
|
||
if not self.warmup(batch_size, warmup_iterations=5):
|
||
return BatchTestResult(
|
||
batch_size=batch_size,
|
||
avg_fps=0, avg_latency_ms=0, avg_throughput=0,
|
||
avg_gpu_util=0, avg_gpu_memory_mb=0, max_gpu_memory_mb=0,
|
||
test_duration=0, total_frames=0,
|
||
success=False,
|
||
error_message="预热失败"
|
||
)
|
||
|
||
# 开始测试
|
||
latency_list = []
|
||
gpu_memory_list = []
|
||
batch_count = 0
|
||
|
||
start_time = time.time()
|
||
|
||
while time.time() - start_time < test_duration:
|
||
# 生成测试数据
|
||
test_images = [np.random.randint(0, 255, (640, 640, 3), dtype=np.uint8)
|
||
for _ in range(batch_size)]
|
||
|
||
# 记录 GPU 内存
|
||
if torch.cuda.is_available():
|
||
gpu_memory_mb = torch.cuda.memory_allocated(0) / 1024 / 1024
|
||
gpu_memory_list.append(gpu_memory_mb)
|
||
|
||
# 推理
|
||
infer_start = time.time()
|
||
|
||
if self.use_trt_api:
|
||
self._infer_trt_api(test_images)
|
||
else:
|
||
self.model(test_images, verbose=False)
|
||
|
||
infer_end = time.time()
|
||
|
||
# 记录延迟
|
||
latency_ms = (infer_end - infer_start) * 1000
|
||
latency_list.append(latency_ms)
|
||
|
||
batch_count += 1
|
||
|
||
# 显示进度
|
||
if batch_count % 10 == 0:
|
||
elapsed = time.time() - start_time
|
||
current_fps = (batch_count * batch_size) / elapsed
|
||
print(f" 进度: {elapsed:.1f}s/{test_duration}s, "
|
||
f"当前吞吐量: {current_fps:.1f} FPS, "
|
||
f"延迟: {latency_ms:.1f}ms")
|
||
|
||
# 计算结果
|
||
total_time = time.time() - start_time
|
||
total_frames = batch_count * batch_size
|
||
|
||
avg_latency_ms = np.mean(latency_list)
|
||
avg_throughput = total_frames / total_time
|
||
avg_fps = avg_throughput # 对于批量推理,FPS = 吞吐量
|
||
|
||
# GPU 指标
|
||
avg_gpu_memory_mb = np.mean(gpu_memory_list) if gpu_memory_list else 0
|
||
max_gpu_memory_mb = np.max(gpu_memory_list) if gpu_memory_list else 0
|
||
|
||
# GPU 利用率(简化计算)
|
||
try:
|
||
import GPUtil
|
||
gpus = GPUtil.getGPUs()
|
||
avg_gpu_util = gpus[0].load * 100 if gpus else 0
|
||
except:
|
||
avg_gpu_util = 0
|
||
|
||
result = BatchTestResult(
|
||
batch_size=batch_size,
|
||
avg_fps=avg_fps,
|
||
avg_latency_ms=avg_latency_ms,
|
||
avg_throughput=avg_throughput,
|
||
avg_gpu_util=avg_gpu_util,
|
||
avg_gpu_memory_mb=avg_gpu_memory_mb,
|
||
max_gpu_memory_mb=max_gpu_memory_mb,
|
||
test_duration=total_time,
|
||
total_frames=total_frames,
|
||
success=True
|
||
)
|
||
|
||
print(f"✅ 批次 {batch_size} 测试完成:")
|
||
print(f" 平均吞吐量: {result.avg_throughput:.1f} FPS")
|
||
print(f" 平均延迟: {result.avg_latency_ms:.1f}ms")
|
||
print(f" GPU 内存: {result.avg_gpu_memory_mb:.1f}MB (峰值: {result.max_gpu_memory_mb:.1f}MB)")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
print(f"❌ 批次 {batch_size} 测试失败: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
return BatchTestResult(
|
||
batch_size=batch_size,
|
||
avg_fps=0, avg_latency_ms=0, avg_throughput=0,
|
||
avg_gpu_util=0, avg_gpu_memory_mb=0, max_gpu_memory_mb=0,
|
||
test_duration=0, total_frames=0,
|
||
success=False,
|
||
error_message=str(e)
|
||
)
|
||
|
||
def run_full_batch_test(self, batch_sizes: List[int], test_duration: int = 20) -> Dict:
|
||
"""运行完整的批次性能测试"""
|
||
print("🚀 开始动态批次性能测试")
|
||
print("=" * 60)
|
||
|
||
results = {
|
||
'engine_path': self.engine_path,
|
||
'timestamp': datetime.now().isoformat(),
|
||
'batch_tests': [],
|
||
'summary': {}
|
||
}
|
||
|
||
successful_tests = []
|
||
|
||
for batch_size in batch_sizes:
|
||
result = self.test_batch_size(batch_size, test_duration)
|
||
results['batch_tests'].append(asdict(result))
|
||
|
||
if result.success:
|
||
successful_tests.append(result)
|
||
|
||
# 生成摘要
|
||
if successful_tests:
|
||
best_throughput = max(successful_tests, key=lambda x: x.avg_throughput)
|
||
best_latency = min(successful_tests, key=lambda x: x.avg_latency_ms)
|
||
|
||
results['summary'] = {
|
||
'total_tests': len(batch_sizes),
|
||
'successful_tests': len(successful_tests),
|
||
'failed_tests': len(batch_sizes) - len(successful_tests),
|
||
'best_throughput': {
|
||
'batch_size': best_throughput.batch_size,
|
||
'fps': best_throughput.avg_throughput
|
||
},
|
||
'best_latency': {
|
||
'batch_size': best_latency.batch_size,
|
||
'latency_ms': best_latency.avg_latency_ms
|
||
}
|
||
}
|
||
|
||
return results
|
||
|
||
def save_results(results: Dict, output_dir: str = "batch_test_results"):
|
||
"""保存测试结果"""
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
|
||
# 保存 JSON 结果
|
||
json_file = os.path.join(output_dir, f"batch_test_results_{timestamp}.json")
|
||
with open(json_file, 'w', encoding='utf-8') as f:
|
||
json.dump(results, f, indent=2, ensure_ascii=False)
|
||
|
||
print(f"\n✅ 测试结果已保存: {json_file}")
|
||
|
||
# 生成文本报告
|
||
report_file = os.path.join(output_dir, f"batch_test_report_{timestamp}.txt")
|
||
with open(report_file, 'w', encoding='utf-8') as f:
|
||
f.write("动态批次 TensorRT 性能测试报告\n")
|
||
f.write("=" * 60 + "\n")
|
||
f.write(f"测试时间: {results['timestamp']}\n")
|
||
f.write(f"引擎路径: {results['engine_path']}\n\n")
|
||
|
||
f.write("批次性能测试结果:\n")
|
||
f.write("-" * 60 + "\n")
|
||
|
||
for test in results['batch_tests']:
|
||
if test['success']:
|
||
f.write(f"\n批次大小: {test['batch_size']}\n")
|
||
f.write(f" 平均吞吐量: {test['avg_throughput']:.1f} FPS\n")
|
||
f.write(f" 平均延迟: {test['avg_latency_ms']:.1f}ms\n")
|
||
f.write(f" GPU 利用率: {test['avg_gpu_util']:.1f}%\n")
|
||
f.write(f" GPU 内存: {test['avg_gpu_memory_mb']:.1f}MB (峰值: {test['max_gpu_memory_mb']:.1f}MB)\n")
|
||
f.write(f" 测试时长: {test['test_duration']:.1f}s\n")
|
||
f.write(f" 总帧数: {test['total_frames']}\n")
|
||
else:
|
||
f.write(f"\n批次大小: {test['batch_size']} - 失败\n")
|
||
f.write(f" 错误信息: {test['error_message']}\n")
|
||
|
||
if 'summary' in results and results['summary']:
|
||
summary = results['summary']
|
||
f.write(f"\n\n测试摘要:\n")
|
||
f.write("=" * 60 + "\n")
|
||
f.write(f"总测试数: {summary['total_tests']}\n")
|
||
f.write(f"成功测试: {summary['successful_tests']}\n")
|
||
f.write(f"失败测试: {summary['failed_tests']}\n")
|
||
|
||
if 'best_throughput' in summary:
|
||
f.write(f"\n最佳吞吐量:\n")
|
||
f.write(f" 批次大小: {summary['best_throughput']['batch_size']}\n")
|
||
f.write(f" 吞吐量: {summary['best_throughput']['fps']:.1f} FPS\n")
|
||
|
||
if 'best_latency' in summary:
|
||
f.write(f"\n最低延迟:\n")
|
||
f.write(f" 批次大小: {summary['best_latency']['batch_size']}\n")
|
||
f.write(f" 延迟: {summary['best_latency']['latency_ms']:.1f}ms\n")
|
||
|
||
print(f"✅ 测试报告已保存: {report_file}")
|
||
|
||
return json_file, report_file
|
||
|
||
def main():
|
||
"""主函数"""
|
||
print("动态批次 TensorRT 性能测试系统")
|
||
print("=" * 60)
|
||
|
||
# 引擎路径
|
||
engine_path = "C:/Users/16337/PycharmProjects/Security/yolo11n_dynamic.engine"
|
||
|
||
# 检查引擎文件
|
||
if not os.path.exists(engine_path):
|
||
print(f"❌ TensorRT 引擎不存在: {engine_path}")
|
||
print("请先运行 dynamic_batch_tensorrt_builder.py 构建动态批次引擎")
|
||
return
|
||
|
||
# 检查 CUDA
|
||
if not torch.cuda.is_available():
|
||
print("❌ CUDA 不可用")
|
||
return
|
||
|
||
print(f"✅ CUDA 可用,设备: {torch.cuda.get_device_name(0)}")
|
||
|
||
try:
|
||
# 创建测试器
|
||
tester = DynamicBatchTester(engine_path)
|
||
tester.load_engine()
|
||
|
||
# 测试批次大小列表
|
||
batch_sizes = [1, 2, 4, 8, 16, 32]
|
||
test_duration = 20 # 每个批次测试 20 秒
|
||
|
||
print(f"\n📊 测试配置:")
|
||
print(f" 批次大小: {batch_sizes}")
|
||
print(f" 每批次测试时长: {test_duration}秒")
|
||
|
||
# 运行完整测试
|
||
results = tester.run_full_batch_test(batch_sizes, test_duration)
|
||
|
||
# 保存结果
|
||
json_file, report_file = save_results(results)
|
||
|
||
# 打印摘要
|
||
if 'summary' in results and results['summary']:
|
||
summary = results['summary']
|
||
print(f"\n🎯 测试摘要:")
|
||
print(f" 成功: {summary['successful_tests']}/{summary['total_tests']}")
|
||
|
||
if 'best_throughput' in summary:
|
||
print(f" 最佳吞吐量: 批次 {summary['best_throughput']['batch_size']} "
|
||
f"({summary['best_throughput']['fps']:.1f} FPS)")
|
||
|
||
if 'best_latency' in summary:
|
||
print(f" 最低延迟: 批次 {summary['best_latency']['batch_size']} "
|
||
f"({summary['best_latency']['latency_ms']:.1f}ms)")
|
||
|
||
print(f"\n📁 结果文件:")
|
||
print(f" JSON: {json_file}")
|
||
print(f" 报告: {report_file}")
|
||
|
||
print(f"\n🎨 生成可视化图表:")
|
||
print(f" 运行命令: python visualize_batch_results.py")
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n\n⏹️ 测试被用户中断")
|
||
except Exception as e:
|
||
print(f"\n❌ 测试过程中发生错误: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
if __name__ == "__main__":
|
||
main()
|