Files
Test_AI/run_batch_performance_test.py
2026-01-20 11:14:10 +08:00

409 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
动态批次 TensorRT 性能测试系统
系统性评估不同批次大小下的性能表现
"""
import os
import sys
import time
import json
import numpy as np
import torch
import psutil
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
@dataclass
class BatchTestResult:
"""批次测试结果"""
batch_size: int
avg_fps: float
avg_latency_ms: float
avg_throughput: float # 每秒处理的图像数
avg_gpu_util: float
avg_gpu_memory_mb: float
max_gpu_memory_mb: float
test_duration: float
total_frames: int
success: bool
error_message: Optional[str] = None
class DynamicBatchTester:
"""动态批次性能测试器"""
def __init__(self, engine_path: str):
self.engine_path = engine_path
self.model = None
def load_engine(self):
"""加载 TensorRT 引擎"""
print(f"📦 加载 TensorRT 引擎: {self.engine_path}")
if not os.path.exists(self.engine_path):
raise FileNotFoundError(f"引擎文件不存在: {self.engine_path}")
try:
# 尝试使用 TensorRT Python API 加载
import tensorrt as trt
logger = trt.Logger(trt.Logger.WARNING)
with open(self.engine_path, 'rb') as f:
self.trt_runtime = trt.Runtime(logger)
self.trt_engine = self.trt_runtime.deserialize_cuda_engine(f.read())
if self.trt_engine is None:
raise RuntimeError("TensorRT 引擎加载失败")
self.trt_context = self.trt_engine.create_execution_context()
self.use_trt_api = True
print("✅ 使用 TensorRT Python API 加载引擎")
except ImportError:
# 回退到 ultralytics
from ultralytics import YOLO
self.model = YOLO(self.engine_path)
self.use_trt_api = False
print("✅ 使用 Ultralytics 加载引擎")
def warmup(self, batch_size: int, warmup_iterations: int = 10):
"""预热引擎"""
print(f"🔥 预热引擎 (批次大小: {batch_size}, 迭代次数: {warmup_iterations})...")
for i in range(warmup_iterations):
# 生成随机测试数据
test_images = [np.random.randint(0, 255, (640, 640, 3), dtype=np.uint8)
for _ in range(batch_size)]
try:
if self.use_trt_api:
self._infer_trt_api(test_images)
else:
self.model(test_images, verbose=False)
except Exception as e:
print(f"⚠️ 预热失败: {e}")
return False
print("✅ 预热完成")
return True
def _infer_trt_api(self, images: List[np.ndarray]):
"""使用 TensorRT API 进行推理"""
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
batch_size = len(images)
# 设置输入形状
input_name = self.trt_engine.get_tensor_name(0)
self.trt_context.set_input_shape(input_name, (batch_size, 3, 640, 640))
# 准备输入数据
input_data = np.stack([cv2.resize(img, (640, 640)) for img in images])
input_data = input_data.transpose(0, 3, 1, 2).astype(np.float32) / 255.0
# 分配 GPU 内存
d_input = cuda.mem_alloc(input_data.nbytes)
# 获取输出形状
output_shape = self.trt_context.get_tensor_shape(self.trt_engine.get_tensor_name(1))
output_data = np.empty(output_shape, dtype=np.float32)
d_output = cuda.mem_alloc(output_data.nbytes)
# 复制数据到 GPU
cuda.memcpy_htod(d_input, input_data)
# 执行推理
self.trt_context.execute_v2([int(d_input), int(d_output)])
# 复制结果回 CPU
cuda.memcpy_dtoh(output_data, d_output)
return output_data
def test_batch_size(self, batch_size: int, test_duration: int = 20) -> BatchTestResult:
"""测试特定批次大小的性能"""
print(f"\n🔄 测试批次大小: {batch_size} (测试时长: {test_duration}秒)")
try:
# 预热
if not self.warmup(batch_size, warmup_iterations=5):
return BatchTestResult(
batch_size=batch_size,
avg_fps=0, avg_latency_ms=0, avg_throughput=0,
avg_gpu_util=0, avg_gpu_memory_mb=0, max_gpu_memory_mb=0,
test_duration=0, total_frames=0,
success=False,
error_message="预热失败"
)
# 开始测试
latency_list = []
gpu_memory_list = []
batch_count = 0
start_time = time.time()
while time.time() - start_time < test_duration:
# 生成测试数据
test_images = [np.random.randint(0, 255, (640, 640, 3), dtype=np.uint8)
for _ in range(batch_size)]
# 记录 GPU 内存
if torch.cuda.is_available():
gpu_memory_mb = torch.cuda.memory_allocated(0) / 1024 / 1024
gpu_memory_list.append(gpu_memory_mb)
# 推理
infer_start = time.time()
if self.use_trt_api:
self._infer_trt_api(test_images)
else:
self.model(test_images, verbose=False)
infer_end = time.time()
# 记录延迟
latency_ms = (infer_end - infer_start) * 1000
latency_list.append(latency_ms)
batch_count += 1
# 显示进度
if batch_count % 10 == 0:
elapsed = time.time() - start_time
current_fps = (batch_count * batch_size) / elapsed
print(f" 进度: {elapsed:.1f}s/{test_duration}s, "
f"当前吞吐量: {current_fps:.1f} FPS, "
f"延迟: {latency_ms:.1f}ms")
# 计算结果
total_time = time.time() - start_time
total_frames = batch_count * batch_size
avg_latency_ms = np.mean(latency_list)
avg_throughput = total_frames / total_time
avg_fps = avg_throughput # 对于批量推理FPS = 吞吐量
# GPU 指标
avg_gpu_memory_mb = np.mean(gpu_memory_list) if gpu_memory_list else 0
max_gpu_memory_mb = np.max(gpu_memory_list) if gpu_memory_list else 0
# GPU 利用率(简化计算)
try:
import GPUtil
gpus = GPUtil.getGPUs()
avg_gpu_util = gpus[0].load * 100 if gpus else 0
except:
avg_gpu_util = 0
result = BatchTestResult(
batch_size=batch_size,
avg_fps=avg_fps,
avg_latency_ms=avg_latency_ms,
avg_throughput=avg_throughput,
avg_gpu_util=avg_gpu_util,
avg_gpu_memory_mb=avg_gpu_memory_mb,
max_gpu_memory_mb=max_gpu_memory_mb,
test_duration=total_time,
total_frames=total_frames,
success=True
)
print(f"✅ 批次 {batch_size} 测试完成:")
print(f" 平均吞吐量: {result.avg_throughput:.1f} FPS")
print(f" 平均延迟: {result.avg_latency_ms:.1f}ms")
print(f" GPU 内存: {result.avg_gpu_memory_mb:.1f}MB (峰值: {result.max_gpu_memory_mb:.1f}MB)")
return result
except Exception as e:
print(f"❌ 批次 {batch_size} 测试失败: {e}")
import traceback
traceback.print_exc()
return BatchTestResult(
batch_size=batch_size,
avg_fps=0, avg_latency_ms=0, avg_throughput=0,
avg_gpu_util=0, avg_gpu_memory_mb=0, max_gpu_memory_mb=0,
test_duration=0, total_frames=0,
success=False,
error_message=str(e)
)
def run_full_batch_test(self, batch_sizes: List[int], test_duration: int = 20) -> Dict:
"""运行完整的批次性能测试"""
print("🚀 开始动态批次性能测试")
print("=" * 60)
results = {
'engine_path': self.engine_path,
'timestamp': datetime.now().isoformat(),
'batch_tests': [],
'summary': {}
}
successful_tests = []
for batch_size in batch_sizes:
result = self.test_batch_size(batch_size, test_duration)
results['batch_tests'].append(asdict(result))
if result.success:
successful_tests.append(result)
# 生成摘要
if successful_tests:
best_throughput = max(successful_tests, key=lambda x: x.avg_throughput)
best_latency = min(successful_tests, key=lambda x: x.avg_latency_ms)
results['summary'] = {
'total_tests': len(batch_sizes),
'successful_tests': len(successful_tests),
'failed_tests': len(batch_sizes) - len(successful_tests),
'best_throughput': {
'batch_size': best_throughput.batch_size,
'fps': best_throughput.avg_throughput
},
'best_latency': {
'batch_size': best_latency.batch_size,
'latency_ms': best_latency.avg_latency_ms
}
}
return results
def save_results(results: Dict, output_dir: str = "batch_test_results"):
"""保存测试结果"""
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# 保存 JSON 结果
json_file = os.path.join(output_dir, f"batch_test_results_{timestamp}.json")
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"\n✅ 测试结果已保存: {json_file}")
# 生成文本报告
report_file = os.path.join(output_dir, f"batch_test_report_{timestamp}.txt")
with open(report_file, 'w', encoding='utf-8') as f:
f.write("动态批次 TensorRT 性能测试报告\n")
f.write("=" * 60 + "\n")
f.write(f"测试时间: {results['timestamp']}\n")
f.write(f"引擎路径: {results['engine_path']}\n\n")
f.write("批次性能测试结果:\n")
f.write("-" * 60 + "\n")
for test in results['batch_tests']:
if test['success']:
f.write(f"\n批次大小: {test['batch_size']}\n")
f.write(f" 平均吞吐量: {test['avg_throughput']:.1f} FPS\n")
f.write(f" 平均延迟: {test['avg_latency_ms']:.1f}ms\n")
f.write(f" GPU 利用率: {test['avg_gpu_util']:.1f}%\n")
f.write(f" GPU 内存: {test['avg_gpu_memory_mb']:.1f}MB (峰值: {test['max_gpu_memory_mb']:.1f}MB)\n")
f.write(f" 测试时长: {test['test_duration']:.1f}s\n")
f.write(f" 总帧数: {test['total_frames']}\n")
else:
f.write(f"\n批次大小: {test['batch_size']} - 失败\n")
f.write(f" 错误信息: {test['error_message']}\n")
if 'summary' in results and results['summary']:
summary = results['summary']
f.write(f"\n\n测试摘要:\n")
f.write("=" * 60 + "\n")
f.write(f"总测试数: {summary['total_tests']}\n")
f.write(f"成功测试: {summary['successful_tests']}\n")
f.write(f"失败测试: {summary['failed_tests']}\n")
if 'best_throughput' in summary:
f.write(f"\n最佳吞吐量:\n")
f.write(f" 批次大小: {summary['best_throughput']['batch_size']}\n")
f.write(f" 吞吐量: {summary['best_throughput']['fps']:.1f} FPS\n")
if 'best_latency' in summary:
f.write(f"\n最低延迟:\n")
f.write(f" 批次大小: {summary['best_latency']['batch_size']}\n")
f.write(f" 延迟: {summary['best_latency']['latency_ms']:.1f}ms\n")
print(f"✅ 测试报告已保存: {report_file}")
return json_file, report_file
def main():
"""主函数"""
print("动态批次 TensorRT 性能测试系统")
print("=" * 60)
# 引擎路径
engine_path = "C:/Users/16337/PycharmProjects/Security/yolo11n_dynamic.engine"
# 检查引擎文件
if not os.path.exists(engine_path):
print(f"❌ TensorRT 引擎不存在: {engine_path}")
print("请先运行 dynamic_batch_tensorrt_builder.py 构建动态批次引擎")
return
# 检查 CUDA
if not torch.cuda.is_available():
print("❌ CUDA 不可用")
return
print(f"✅ CUDA 可用,设备: {torch.cuda.get_device_name(0)}")
try:
# 创建测试器
tester = DynamicBatchTester(engine_path)
tester.load_engine()
# 测试批次大小列表
batch_sizes = [1, 2, 4, 8, 16, 32]
test_duration = 20 # 每个批次测试 20 秒
print(f"\n📊 测试配置:")
print(f" 批次大小: {batch_sizes}")
print(f" 每批次测试时长: {test_duration}")
# 运行完整测试
results = tester.run_full_batch_test(batch_sizes, test_duration)
# 保存结果
json_file, report_file = save_results(results)
# 打印摘要
if 'summary' in results and results['summary']:
summary = results['summary']
print(f"\n🎯 测试摘要:")
print(f" 成功: {summary['successful_tests']}/{summary['total_tests']}")
if 'best_throughput' in summary:
print(f" 最佳吞吐量: 批次 {summary['best_throughput']['batch_size']} "
f"({summary['best_throughput']['fps']:.1f} FPS)")
if 'best_latency' in summary:
print(f" 最低延迟: 批次 {summary['best_latency']['batch_size']} "
f"({summary['best_latency']['latency_ms']:.1f}ms)")
print(f"\n📁 结果文件:")
print(f" JSON: {json_file}")
print(f" 报告: {report_file}")
print(f"\n🎨 生成可视化图表:")
print(f" 运行命令: python visualize_batch_results.py")
except KeyboardInterrupt:
print("\n\n⏹️ 测试被用户中断")
except Exception as e:
print(f"\n❌ 测试过程中发生错误: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()