Files
security-ai-edge/core/tensorrt_engine.py
16337 98595402c6 fix: 修复10个关键bug提升系统稳定性和性能
1. YOLO11输出解析错误: 移除不存在的objectness行,正确使用class_scores.max()
2. CPU NMS逻辑错误: keep_mask同时标记保留和抑制框导致NMS失效,改用独立suppressed集合
3. 坐标映射缺失: _build_tracks中scale_info未使用,添加revert_boxes还原到ROI裁剪空间
4. batch=1限制: 恢复真正的动态batch推理(1~8),BatchPreprocessor支持多图stack
5. 帧率控制缺失: _read_frame添加time.monotonic()间隔控制,按target_fps跳帧
6. 拉流推理耦合: 新增独立推理线程(InferenceWorker),生产者-消费者模式解耦
7. 攒批形同虚设: 添加50ms攒批窗口+max_batch阈值,替代>=1立即处理
8. LeavePost双重等待: LEAVING确认后直接触发告警,不再进入OFF_DUTY二次等待
9. register_algorithm每帧调用: 添加_registered_keys缓存,O(1)快速路径跳过
10. GPU context线程安全: TensorRT infer()内部加锁,防止多线程CUDA context竞争

附带修复:
- reset_algorithm中未定义algorithm_type变量(NameError)
- update_roi_params中循环变量key覆盖外层key
- AlertInfo缺少bind_id字段(TypeError)
- _logger.log_alert在标准logger上不存在(AttributeError)
- AlarmStateMachine死锁(Lock改为RLock)
- ROICropper.create_mask坐标解析错误
- 更新测试用例适配新API

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-04 16:47:26 +08:00

399 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
TensorRT推理引擎模块
固定 batch=4, FP16, 3×480×480
工业级实现Buffer Pool、异步推理、性能监控
"""
import logging
import threading
import time
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
try:
import tensorrt as trt
import pycuda.driver as cuda
import pycuda.autoinit
TRT_AVAILABLE = True
except ImportError:
TRT_AVAILABLE = False
trt = None
cuda = None
from config.settings import get_settings, InferenceConfig
from utils.logger import get_logger
logger = logging.getLogger(__name__)
class HostDeviceMem:
"""Host/Device 内存对(工业级 Buffer Pool"""
def __init__(self, host_mem, device_mem):
self.host = host_mem
self.device = device_mem
def __repr__(self):
return f"Host:{self.host.shape}, Device:{int(self.device)}"
def pad_to_batch4(frames: List[np.ndarray]) -> np.ndarray:
"""
Padding 到 batch=N重复最后一帧已弃用改用 batch=1
Args:
frames: list of [3, 480, 480] numpy arrays
Returns:
np.ndarray: [N, 3, 480, 480]
"""
if len(frames) == 0:
raise ValueError("Empty frames list")
return np.stack(frames)
class TensorRTEngine:
"""TensorRT 引擎 (动态 batch 1~8, opt=4, FP16, 3×480×480)
特性:
- Buffer Pool: bindings 按 max_batch 分配,推理时按实际 batch 使用
- Pinned Memory: 使用 pagelocked host memory 提升 H2D/D2H 性能
- Async API: CUDA stream + async memcpy + execute_async_v2
- 推理锁: 保证多线程下 CUDA context 安全
"""
MAX_BATCH_SIZE = 8
OPT_BATCH_SIZE = 4
INPUT_SHAPE = (3, 480, 480)
def __init__(self, config: Optional[InferenceConfig] = None):
if not TRT_AVAILABLE:
raise RuntimeError("TensorRT 未安装,请先安装 tensorrt 库")
if config is None:
settings = get_settings()
config = settings.inference
self.config = config
self._engine = None
self._context = None
self._stream = None
self._released = False
self._cuda_context = None
self._logger = get_logger("tensorrt")
self._lock = threading.Lock()
self._bindings: List[int] = []
self._inputs: List[HostDeviceMem] = []
self._outputs: List[HostDeviceMem] = []
self._performance_stats = {
"inference_count": 0,
"total_inference_time_ms": 0.0,
"avg_inference_time_ms": 0.0,
"throughput_fps": 0.0,
"last_inference_time_ms": 0.0,
}
self._logger.info(
f"TensorRT 引擎初始化: "
f"{config.model_path}, "
f"batch=1~{self.MAX_BATCH_SIZE} (opt={self.OPT_BATCH_SIZE}), "
f"shape={self.INPUT_SHAPE}, "
f"fp16={config.fp16_mode}"
)
def load_engine(self, engine_path: Optional[str] = None) -> bool:
"""加载 TensorRT engine 文件"""
if engine_path is None:
engine_path = self.config.model_path
with self._lock:
try:
if self._context is not None:
self._release_resources()
self._cuda_context = cuda.Device(0).make_context()
self._stream = cuda.Stream()
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
with open(engine_path, "rb") as f:
runtime = trt.Runtime(TRT_LOGGER)
self._engine = runtime.deserialize_cuda_engine(f.read())
self._context = self._engine.create_execution_context()
self._allocate_buffers()
self._logger.log_connection_event(
"load", "TensorRT", engine_path, True
)
self._logger.info(f"TensorRT 引擎加载成功: {engine_path}")
self._logger.info(f" 输入: {len(self._inputs)}, 输出: {len(self._outputs)}, batch=1~{self.MAX_BATCH_SIZE}")
return True
except Exception as e:
self._logger.error(f"TensorRT 引擎加载失败: {e}")
return False
def _allocate_buffers(self):
"""Buffer Pool: 初始化阶段一次性分配所有 bindings
对于动态 shape engine使用配置中的 batch_size 作为默认大小
"""
self._bindings = []
self._inputs = []
self._outputs = []
for binding_idx in range(self._engine.num_bindings):
shape = list(self._engine.get_binding_shape(binding_idx))
dtype = trt.nptype(self._engine.get_binding_dtype(binding_idx))
if shape[0] == -1:
shape[0] = self.MAX_BATCH_SIZE
shape = tuple(max(1, s) if s < 0 else s for s in shape)
size = trt.volume(shape)
try:
host_mem = cuda.pagelocked_empty(size, dtype)
except Exception as e:
self._logger.warning(f"pagelocked memory 分配失败,回退到普通 numpy: {e}")
host_mem = np.zeros(size, dtype=dtype)
device_mem = cuda.mem_alloc(host_mem.nbytes)
self._bindings.append(int(device_mem))
mem_pair = HostDeviceMem(host_mem, device_mem)
if self._engine.binding_is_input(binding_idx):
self._inputs.append(mem_pair)
else:
self._outputs.append(mem_pair)
if len(self._inputs) == 0:
raise RuntimeError("No input bindings found")
if len(self._outputs) == 0:
raise RuntimeError("No output bindings found")
def infer(self, input_batch: np.ndarray) -> Tuple[List[np.ndarray], float]:
"""
执行推理(工业级 async 模式,线程安全)
Args:
input_batch: numpy 输入shape = [batch, 3, 480, 480]dtype = np.float16
batch 可以是 1~MAX_BATCH_SIZE 的任意值
Returns:
tuple: (输出列表, 推理耗时ms)
"""
if self._engine is None or self._context is None:
raise RuntimeError("引擎未加载")
if len(self._inputs) == 0:
raise RuntimeError("未分配输入 buffer")
batch_size = input_batch.shape[0]
with self._lock:
start_time = time.perf_counter()
self._cuda_context.push()
try:
input_batch = np.ascontiguousarray(input_batch)
input_name = self._engine.get_binding_name(0)
actual_shape = list(input_batch.shape)
self._context.set_input_shape(input_name, actual_shape)
np.copyto(self._inputs[0].host[:input_batch.size], input_batch.ravel())
cuda.memcpy_htod_async(
self._inputs[0].device,
self._inputs[0].host,
self._stream
)
self._context.execute_async_v2(
bindings=self._bindings,
stream_handle=self._stream.handle
)
results = []
for out in self._outputs:
cuda.memcpy_dtoh_async(
out.host,
out.device,
self._stream
)
self._stream.synchronize()
# 根据实际 batch_size 裁剪输出
for out in self._outputs:
output_data = out.host.copy()
# 输出 shape 需按 batch_size 重新划分
per_batch_size = len(output_data) // self.MAX_BATCH_SIZE
actual_size = per_batch_size * batch_size
results.append(output_data[:actual_size])
inference_time_ms = (time.perf_counter() - start_time) * 1000
self._update_performance_stats(inference_time_ms, batch_size)
return results, inference_time_ms
finally:
self._cuda_context.pop()
def _update_performance_stats(self, inference_time_ms: float, batch_size: int):
"""更新性能统计"""
stats = self._performance_stats
stats["inference_count"] += 1
stats["total_inference_time_ms"] += inference_time_ms
stats["last_inference_time_ms"] = inference_time_ms
stats["avg_inference_time_ms"] = (
stats["total_inference_time_ms"] / stats["inference_count"]
)
total_time = stats["total_inference_time_ms"] / 1000.0
if total_time > 0:
stats["throughput_fps"] = stats["inference_count"] / total_time
def get_performance_stats(self) -> Dict[str, Any]:
"""获取性能统计"""
stats = dict(self._performance_stats)
stats["memory_usage"] = self.get_memory_usage()
return stats
def get_memory_usage(self) -> Dict[str, float]:
"""获取显存使用情况"""
try:
import pynvml
pynvml.nvmlInit()
handle = pynvml.nvmlDeviceGetHandleByIndex(self.config.device_id)
mem_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
pynvml.nvmlShutdown()
return {
"total_mb": mem_info.total / (1024 ** 2),
"used_mb": mem_info.used / (1024 ** 2),
"free_mb": mem_info.free / (1024 ** 2),
}
except Exception as e:
self._logger.warning(f"获取显存信息失败: {e}")
return {"total_mb": 0, "used_mb": 0, "free_mb": 0}
def _release_resources(self):
"""释放资源"""
if self._cuda_context:
try:
self._cuda_context.pop()
self._cuda_context.detach()
except Exception:
pass
self._cuda_context = None
if self._stream:
try:
self._stream.synchronize()
except Exception:
pass
self._stream = None
self._context = None
self._engine = None
self._bindings = []
self._inputs = []
self._outputs = []
def release(self):
"""释放引擎资源(幂等调用)"""
with self._lock:
if self._released:
return
self._released = True
self._release_resources()
self._logger.info("TensorRT 引擎资源已释放")
def __del__(self):
"""析构函数"""
self.release()
class EngineManager:
"""引擎管理器类"""
def __init__(self):
self._engines: Dict[str, TensorRTEngine] = {}
self._logger = get_logger("tensorrt")
self._lock = threading.Lock()
def load_engine(
self,
engine_id: str,
engine_path: str,
config: Optional[InferenceConfig] = None
) -> bool:
"""加载引擎"""
with self._lock:
if engine_id in self._engines:
self._engines[engine_id].release()
engine = TensorRTEngine(config)
if engine.load_engine(engine_path):
self._engines[engine_id] = engine
self._logger.info(f"引擎已加载: {engine_id}")
return True
else:
return False
def get_engine(self, engine_id: str) -> Optional[TensorRTEngine]:
"""获取引擎"""
return self._engines.get(engine_id)
def release_engine(self, engine_id: str):
"""释放引擎"""
with self._lock:
if engine_id in self._engines:
self._engines[engine_id].release()
del self._engines[engine_id]
self._logger.info(f"引擎已释放: {engine_id}")
def release_all(self):
"""释放所有引擎"""
with self._lock:
for engine_id in list(self._engines.keys()):
self._engines[engine_id].release()
self._engines.clear()
self._logger.info("所有引擎已释放")
def get_all_stats(self) -> Dict[str, Any]:
"""获取所有引擎统计"""
with self._lock:
return {
engine_id: engine.get_performance_stats()
for engine_id, engine in self._engines.items()
}
def create_tensorrt_engine(
engine_path: str,
config: Optional[InferenceConfig] = None
) -> TensorRTEngine:
"""创建 TensorRT 引擎的便捷函数"""
engine = TensorRTEngine(config)
if engine.load_engine(engine_path):
return engine
else:
raise RuntimeError(f"无法加载 TensorRT 引擎: {engine_path}")