import os import time from typing import Any, Dict, List, Optional, Tuple import cv2 import numpy as np import tensorrt as trt import torch from ultralytics import YOLO from ultralytics.engine.results import Results from config import get_config class TensorRTEngine: def __init__(self, engine_path: Optional[str] = None, device: int = 0): config = get_config() self.engine_path = engine_path or config.model.engine_path self.device = device self.imgsz = tuple(config.model.imgsz) self.conf_thresh = config.model.conf_threshold self.iou_thresh = config.model.iou_threshold self.half = config.model.half self.logger = trt.Logger(trt.Logger.INFO) self.engine = None self.context = None self.stream = None self.input_buffer = None self.output_buffers = [] self._load_engine() def _load_engine(self): if not os.path.exists(self.engine_path): raise FileNotFoundError(f"TensorRT引擎文件不存在: {self.engine_path}") with open(self.engine_path, "rb") as f: serialized_engine = f.read() runtime = trt.Runtime(self.logger) self.engine = runtime.deserialize_cuda_engine(serialized_engine) self.context = self.engine.create_execution_context() self.stream = torch.cuda.Stream(device=self.device) for i in range(self.engine.num_io_tensors): name = self.engine.get_tensor_name(i) dtype = self.engine.get_tensor_dtype(name) shape = self.engine.get_tensor_shape(name) if self.engine.get_tensor_mode(name) == trt.TensorIOMode.INPUT: self.context.set_tensor_address(name, None) else: if dtype == trt.float16: buffer = torch.zeros(shape, dtype=torch.float16, device=self.device) else: buffer = torch.zeros(shape, dtype=torch.float32, device=self.device) self.output_buffers.append(buffer) self.context.set_tensor_address(name, buffer.data_ptr()) self.context.set_optimization_profile_async(0, self.stream) self.input_buffer = torch.zeros( (1, 3, self.imgsz[0], self.imgsz[1]), dtype=torch.float16 if self.half else torch.float32, device=self.device, ) def preprocess(self, frame: np.ndarray) -> torch.Tensor: img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) img = cv2.resize(img, self.imgsz) img = img.transpose(2, 0, 1).astype(np.float32) / 255.0 if self.half: img = img.astype(np.float16) tensor = torch.from_numpy(img).unsqueeze(0).to(self.device) return tensor def inference(self, images: List[np.ndarray]) -> List[Results]: batch_size = len(images) if batch_size == 0: return [] input_tensor = self.preprocess(images[0]) if batch_size > 1: for i in range(1, batch_size): input_tensor = torch.cat( [input_tensor, self.preprocess(images[i])], dim=0 ) self.context.set_tensor_address( "input", input_tensor.contiguous().data_ptr() ) torch.cuda.synchronize(self.stream) self.context.execute_async_v3(self.stream.handle) torch.cuda.synchronize(self.stream) results = [] for i in range(batch_size): pred = self.output_buffers[0][i].cpu().numpy() boxes = pred[:, :4] scores = pred[:, 4] classes = pred[:, 5].astype(np.int32) mask = scores > self.conf_thresh boxes = boxes[mask] scores = scores[mask] classes = classes[mask] indices = cv2.dnn.NMSBoxes( boxes.tolist(), scores.tolist(), self.conf_thresh, self.iou_thresh, ) if len(indices) > 0: for idx in indices: box = boxes[idx] x1, y1, x2, y2 = box w, h = x2 - x1, y2 - y1 conf = scores[idx] cls = classes[idx] orig_h, orig_w = images[i].shape[:2] scale_x, scale_y = orig_w / self.imgsz[1], orig_h / self.imgsz[0] box_orig = [ int(x1 * scale_x), int(y1 * scale_y), int(w * scale_x), int(h * scale_y), ] result = Results( orig_img=images[i], path="", names={0: "person"}, boxes=Boxes( torch.tensor([box_orig + [conf, cls]]), orig_shape=(orig_h, orig_w), ), ) results.append(result) return results def inference_single(self, frame: np.ndarray) -> List[Results]: return self.inference([frame]) def warmup(self, num_warmup: int = 10): dummy_frame = np.zeros((480, 640, 3), dtype=np.uint8) for _ in range(num_warmup): self.inference_single(dummy_frame) def __del__(self): if self.context: self.context.synchronize() if self.stream: self.stream.synchronize() class Boxes: def __init__( self, data: torch.Tensor, orig_shape: Tuple[int, int], is_track: bool = False, ): self.data = data self.orig_shape = orig_shape self.is_track = is_track @property def xyxy(self): if self.is_track: return self.data[:, :4] return self.data[:, :4] @property def conf(self): if self.is_track: return self.data[:, 4] return self.data[:, 4] @property def cls(self): if self.is_track: return self.data[:, 5] return self.data[:, 5] class YOLOEngine: def __init__( self, model_path: Optional[str] = None, device: int = 0, use_trt: bool = True, ): self.use_trt = use_trt self.device = device self.trt_engine = None if not use_trt: if model_path: pt_path = model_path elif hasattr(get_config().model, 'pt_model_path'): pt_path = get_config().model.pt_model_path else: pt_path = get_config().model.engine_path.replace(".engine", ".pt") self.model = YOLO(pt_path) self.model.to(device) else: try: self.trt_engine = TensorRTEngine(device=device) self.trt_engine.warmup() except Exception as e: print(f"TensorRT加载失败,回退到PyTorch: {e}") self.use_trt = False if model_path: pt_path = model_path elif hasattr(get_config().model, 'pt_model_path'): pt_path = get_config().model.pt_model_path else: pt_path = get_config().model.engine_path.replace(".engine", ".pt") self.model = YOLO(pt_path) self.model.to(device) def __call__(self, frame: np.ndarray, **kwargs) -> List[Results]: if self.use_trt: return self.trt_engine.inference_single(frame) else: results = self.model(frame, imgsz=get_config().model.imgsz, **kwargs) return results def __del__(self): if self.trt_engine: del self.trt_engine