Files
security-ai-edge/core/tensorrt_engine.py
16337 956bcbbc3e feat: TensorRT 工业级重构
- 添加 HostDeviceMem 类(Buffer Pool)
- _allocate_buffers() init 阶段一次性分配
- infer() 使用 async API + CUDA stream
- 回退机制:pagelocked 失败时用普通 numpy
2026-02-02 14:12:43 +08:00

385 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
TensorRT推理引擎模块
工业级实现Buffer Pool、异步推理、性能监控
"""
import ctypes
import logging
import threading
import time
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
try:
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
TRT_AVAILABLE = True
except ImportError:
TRT_AVAILABLE = False
trt = None
cuda = None
from config.settings import get_settings, InferenceConfig
from utils.logger import get_logger
logger = logging.getLogger(__name__)
class HostDeviceMem:
"""Host/Device 内存对(工业级 Buffer Pool"""
def __init__(self, host_mem, device_mem):
self.host = host_mem
self.device = device_mem
def __repr__(self):
return f"Host:{self.host.shape}, Device:{int(self.device)}"
class TensorRTEngine:
"""工业级 TensorRT 引擎
特性:
- Buffer Pool: bindings 只在 init 阶段分配一次
- Pinned Memory: 使用 pagelocked host memory 提升 H2D/D2H 性能
- Async API: CUDA stream + async memcpy + execute_async_v2
"""
def __init__(self, config: Optional[InferenceConfig] = None):
if not TRT_AVAILABLE:
raise RuntimeError("TensorRT 未安装,请先安装 tensorrt 库")
if config is None:
settings = get_settings()
config = settings.inference
self.config = config
self._engine = None
self._context = None
self._stream = None
self._released = False
self._cuda_context = None
self._logger = get_logger("tensorrt")
self._lock = threading.Lock()
self._bindings: List[int] = []
self._inputs: List[HostDeviceMem] = []
self._outputs: List[HostDeviceMem] = []
self._binding_names: Dict[int, str] = {}
self._performance_stats = {
"inference_count": 0,
"total_inference_time_ms": 0.0,
"avg_inference_time_ms": 0.0,
"throughput_fps": 0.0,
"last_inference_time_ms": 0.0,
}
self._logger.info(
f"TensorRT 引擎初始化: "
f"{config.model_path}, "
f"{config.input_width}x{config.input_height}, "
f"batch={config.batch_size}, "
f"fp16={config.fp16_mode}"
)
def load_engine(self, engine_path: Optional[str] = None) -> bool:
"""加载 TensorRT engine 文件"""
if engine_path is None:
engine_path = self.config.model_path
with self._lock:
try:
if self._context is not None:
self._release_resources()
self._cuda_context = cuda.Device(0).make_context()
self._stream = cuda.Stream()
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
with open(engine_path, "rb") as f:
runtime = trt.Runtime(TRT_LOGGER)
self._engine = runtime.deserialize_cuda_engine(f.read())
self._context = self._engine.create_execution_context()
self._allocate_buffers()
self._logger.log_connection_event(
"load", "TensorRT", engine_path, True
)
self._logger.info(f"TensorRT 引擎加载成功: {engine_path}")
self._logger.info(f" 输入: {len(self._inputs)}, 输出: {len(self._outputs)}")
return True
except Exception as e:
self._logger.error(f"TensorRT 引擎加载失败: {e}")
return False
def _allocate_buffers(self):
"""Buffer Pool: 初始化阶段一次性分配所有 bindings工业级关键点"""
self._bindings = []
self._inputs = []
self._outputs = []
self._binding_names = {}
for binding_idx in range(self._engine.num_bindings):
name = self._engine.get_binding_name(binding_idx)
dtype = trt.nptype(self._engine.get_binding_dtype(binding_idx))
shape = self._engine.get_binding_shape(binding_idx)
self._binding_names[binding_idx] = name
shape = tuple(max(1, s) if s < 0 else s for s in shape)
size = trt.volume(shape)
try:
host_mem = cuda.pagelocked_empty(size, dtype)
device_mem = cuda.mem_alloc(host_mem.nbytes)
except Exception as e:
self._logger.warning(f"pagelocked memory 分配失败,回退到普通 numpy: {e}")
host_mem = np.zeros(size, dtype=dtype)
device_mem = cuda.mem_alloc(host_mem.nbytes)
self._bindings.append(int(device_mem))
mem_pair = HostDeviceMem(host_mem, device_mem)
if self._engine.binding_is_input(binding_idx):
self._inputs.append(mem_pair)
else:
self._outputs.append(mem_pair)
if len(self._inputs) == 0:
raise RuntimeError("No input bindings found")
if len(self._outputs) == 0:
raise RuntimeError("No output bindings found")
self._logger.debug(
f"Buffer Pool 分配完成: "
f"inputs={[int(i.device) for i in self._inputs]}, "
f"outputs={[int(o.device) for o in self._outputs]}"
)
def _get_output_shape(self, binding_idx: int) -> Tuple[int, ...]:
"""获取输出的 shape"""
name = self._binding_names[binding_idx]
return self._engine.get_binding_shape(name)
def infer(self, input_np: np.ndarray) -> Tuple[List[np.ndarray], float]:
"""
执行推理(工业级 async 模式)
Args:
input_np: numpy 输入shape 必须与 engine 一致
Returns:
tuple: (输出列表, 推理耗时ms)
"""
if self._engine is None or self._context is None:
raise RuntimeError("引擎未加载")
if len(self._inputs) == 0:
raise RuntimeError("未分配输入 buffer")
start_time = time.perf_counter()
self._cuda_context.push()
try:
input_np = np.ascontiguousarray(input_np)
input_name = self._binding_names[0]
self._context.set_input_shape(input_name, input_np.shape)
np.copyto(self._inputs[0].host, input_np.ravel())
cuda.memcpy_htod_async(
self._inputs[0].device,
self._inputs[0].host,
self._stream
)
self._context.execute_async_v2(
bindings=self._bindings,
stream_handle=self._stream.handle
)
for out in self._outputs:
cuda.memcpy_dtoh_async(
out.host,
out.device,
self._stream
)
self._stream.synchronize()
inference_time_ms = (time.perf_counter() - start_time) * 1000
batch_size = input_np.shape[0]
self._update_performance_stats(inference_time_ms, batch_size)
output_shapes = []
for i in range(len(self._inputs), self._engine.num_bindings):
output_shapes.append(self._get_output_shape(i))
results = []
for idx, out in enumerate(self._outputs):
shape = output_shapes[idx] if idx < len(output_shapes) else out.host.shape
results.append(out.host.reshape(shape))
return results, inference_time_ms
finally:
self._cuda_context.pop()
def _update_performance_stats(self, inference_time_ms: float, batch_size: int):
"""更新性能统计"""
stats = self._performance_stats
stats["inference_count"] += 1
stats["total_inference_time_ms"] += inference_time_ms
stats["last_inference_time_ms"] = inference_time_ms
stats["avg_inference_time_ms"] = (
stats["total_inference_time_ms"] / stats["inference_count"]
)
total_time = stats["total_inference_time_ms"] / 1000.0
if total_time > 0:
stats["throughput_fps"] = stats["inference_count"] / total_time
def get_performance_stats(self) -> Dict[str, Any]:
"""获取性能统计"""
stats = dict(self._performance_stats)
stats["memory_usage"] = self.get_memory_usage()
return stats
def get_memory_usage(self) -> Dict[str, float]:
"""获取显存使用情况"""
try:
import pynvml
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(self.config.device_id)
mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
pynvml.nvmlShutdown()
return {
"total_mb": mem_info.total / (1024 ** 2),
"used_mb": mem_info.used / (1024 ** 2),
"free_mb": mem_info.free / (1024 ** 2),
}
except Exception as e:
self._logger.warning(f"获取显存信息失败: {e}")
return {"total_mb": 0, "used_mb": 0, "free_mb": 0}
def _release_resources(self):
"""释放资源"""
if self._cuda_context:
try:
self._cuda_context.pop()
self._cuda_context.detach()
except Exception:
pass
self._cuda_context = None
if self._stream:
try:
self._stream.synchronize()
except Exception:
pass
self._stream = None
self._context = None
self._engine = None
self._bindings = []
self._inputs = []
self._outputs = []
def release(self):
"""释放引擎资源(幂等调用)"""
with self._lock:
if self._released:
return
self._released = True
self._release_resources()
self._logger.info("TensorRT 引擎资源已释放")
def __del__(self):
"""析构函数"""
self.release()
class EngineManager:
"""引擎管理器类"""
def __init__(self):
self._engines: Dict[str, TensorRTEngine] = {}
self._logger = get_logger("tensorrt")
self._lock = threading.Lock()
def load_engine(
self,
engine_id: str,
engine_path: str,
config: Optional[InferenceConfig] = None
) -> bool:
"""加载引擎"""
with self._lock:
if engine_id in self._engines:
self._engines[engine_id].release()
engine = TensorRTEngine(config)
if engine.load_engine(engine_path):
self._engines[engine_id] = engine
self._logger.info(f"引擎已加载: {engine_id}")
return True
else:
return False
def get_engine(self, engine_id: str) -> Optional[TensorRTEngine]:
"""获取引擎"""
return self._engines.get(engine_id)
def release_engine(self, engine_id: str):
"""释放引擎"""
with self._lock:
if engine_id in self._engines:
self._engines[engine_id].release()
del self._engines[engine_id]
self._logger.info(f"引擎已释放: {engine_id}")
def release_all(self):
"""释放所有引擎"""
with self._lock:
for engine_id in list(self._engines.keys()):
self._engines[engine_id].release()
self._engines.clear()
self._logger.info("所有引擎已释放")
def get_all_stats(self) -> Dict[str, Any]:
"""获取所有引擎统计"""
with self._lock:
return {
engine_id: engine.get_performance_stats()
for engine_id, engine in self._engines.items()
}
def create_tensorrt_engine(
engine_path: str,
config: Optional[InferenceConfig] = None
) -> TensorRTEngine:
"""创建 TensorRT 引擎的便捷函数"""
engine = TensorRTEngine(config)
if engine.load_engine(engine_path):
return engine
else:
raise RuntimeError(f"无法加载 TensorRT 引擎: {engine_path}")