import os os.environ["TENSORRT_DISABLE_MYELIN"] = "1" import time from typing import Any, Dict, List, Optional, Tuple import cv2 import numpy as np import tensorrt as trt import torch import onnxruntime as ort from ultralytics import YOLO from ultralytics.engine.results import Results, Boxes as UltralyticsBoxes from config import get_config class ONNXEngine: def __init__(self, onnx_path: Optional[str] = None, device: int = 0): config = get_config() self.onnx_path = onnx_path or config.model.onnx_path self.device = device self.imgsz = tuple(config.model.imgsz) self.conf_thresh = config.model.conf_threshold self.iou_thresh = config.model.iou_threshold self.session = None self.input_names = None self.output_names = None self.load_model() def load_model(self): if not os.path.exists(self.onnx_path): raise FileNotFoundError(f"ONNX模型文件不存在: {self.onnx_path}") providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] if self.device >= 0 else ['CPUExecutionProvider'] self.session = ort.InferenceSession(self.onnx_path, providers=providers) self.input_names = [inp.name for inp in self.session.get_inputs()] self.output_names = [out.name for out in self.session.get_outputs()] def preprocess(self, frame: np.ndarray) -> np.ndarray: img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) img = cv2.resize(img, self.imgsz) img = img.transpose(2, 0, 1).astype(np.float32) / 255.0 return img def postprocess(self, output: np.ndarray, orig_img: np.ndarray) -> List[Results]: c, n = output.shape output = output.T boxes = output[:, :4] scores = output[:, 4] classes = output[:, 5:].argmax(axis=1) if output.shape[1] > 5 else np.zeros(len(output), dtype=np.int32) mask = scores > self.conf_thresh boxes = boxes[mask] scores = scores[mask] classes = classes[mask] if len(boxes) == 0: return [Results(orig_img=orig_img, path="", names={0: "person"})] indices = cv2.dnn.NMSBoxes( boxes.tolist(), scores.tolist(), self.conf_thresh, self.iou_thresh, ) orig_h, orig_w = orig_img.shape[:2] scale_x, scale_y = orig_w / self.imgsz[1], orig_h / self.imgsz[0] filtered_boxes = [] for idx in indices: if idx >= len(boxes): continue box = boxes[idx] x1, y1, x2, y2 = box w, h = x2 - x1, y2 - y1 filtered_boxes.append([ int(x1 * scale_x), int(y1 * scale_y), int(w * scale_x), int(h * scale_y), float(scores[idx]), int(classes[idx]) ]) from ultralytics.engine.results import Boxes as BoxesObj if filtered_boxes: box_tensor = torch.tensor(filtered_boxes) boxes_obj = BoxesObj( box_tensor, orig_shape=(orig_h, orig_w) ) result = Results( orig_img=orig_img, path="", names={0: "person"}, boxes=boxes_obj ) return [result] return [Results(orig_img=orig_img, path="", names={0: "person"})] def inference(self, images: List[np.ndarray]) -> List[Results]: if not images: return [] batch_imgs = [] for frame in images: img = self.preprocess(frame) batch_imgs.append(img) batch = np.stack(batch_imgs, axis=0) inputs = {self.input_names[0]: batch} outputs = self.session.run(self.output_names, inputs) results = [] output = outputs[0] if output.shape[0] == 1: result = self.postprocess(output[0], images[0]) results.extend(result) else: for i in range(output.shape[0]): result = self.postprocess(output[i], images[i]) results.extend(result) return results def inference_single(self, frame: np.ndarray) -> List[Results]: return self.inference([frame]) def warmup(self, num_warmup: int = 10): dummy_frame = np.zeros((640, 640, 3), dtype=np.uint8) for _ in range(num_warmup): self.inference_single(dummy_frame) def __del__(self): if self.session: try: self.session.end_profiling() except Exception: pass class TensorRTEngine: def __init__(self, engine_path: Optional[str] = None, device: int = 0): config = get_config() self.engine_path = engine_path or config.model.engine_path self.device = device self.imgsz = tuple(config.model.imgsz) self.conf_thresh = config.model.conf_threshold self.iou_thresh = config.model.iou_threshold self.half = config.model.half self.logger = trt.Logger(trt.Logger.INFO) self.engine = None self.context = None self.stream = torch.cuda.Stream(device=self.device) self.input_buffer = None self.output_buffers = [] self.input_name = None self.output_name = None self._load_engine() def _load_engine(self): if not os.path.exists(self.engine_path): raise FileNotFoundError(f"TensorRT引擎文件不存在: {self.engine_path}") with open(self.engine_path, "rb") as f: serialized_engine = f.read() runtime = trt.Runtime(self.logger) self.engine = runtime.deserialize_cuda_engine(serialized_engine) self.context = self.engine.create_execution_context() self.stream = torch.cuda.Stream(device=self.device) self.batch_size = 1 for i in range(self.engine.num_io_tensors): name = self.engine.get_tensor_name(i) dtype = self.engine.get_tensor_dtype(name) shape = list(self.engine.get_tensor_shape(name)) if self.engine.get_tensor_mode(name) == trt.TensorIOMode.INPUT: if -1 in shape: shape = [self.batch_size if d == -1 else d for d in shape] if dtype == trt.float16: buffer = torch.zeros(shape, dtype=torch.float16, device=self.device) else: buffer = torch.zeros(shape, dtype=torch.float32, device=self.device) self.input_buffer = buffer self.input_name = name else: if -1 in shape: shape = [self.batch_size if d == -1 else d for d in shape] if dtype == trt.float16: buffer = torch.zeros(shape, dtype=torch.float16, device=self.device) else: buffer = torch.zeros(shape, dtype=torch.float32, device=self.device) self.output_buffers.append(buffer) if self.output_name is None: self.output_name = name self.context.set_tensor_address(name, buffer.data_ptr()) stream_handle = torch.cuda.current_stream(self.device).cuda_stream self.context.set_optimization_profile_async(0, stream_handle) self.batch_size = 1 def preprocess(self, frame: np.ndarray) -> torch.Tensor: img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) img = cv2.resize(img, self.imgsz) img = img.transpose(2, 0, 1).astype(np.float32) / 255.0 if self.half: img = img.astype(np.float16) tensor = torch.from_numpy(img).unsqueeze(0).to(self.device) return tensor def inference(self, images: List[np.ndarray]) -> List[Results]: batch_size = len(images) if batch_size == 0: return [] input_tensor = self.preprocess(images[0]) if batch_size > 1: for i in range(1, batch_size): input_tensor = torch.cat( [input_tensor, self.preprocess(images[i])], dim=0 ) self.context.set_tensor_address( self.input_name, input_tensor.contiguous().data_ptr() ) input_shape = list(input_tensor.shape) self.context.set_input_shape(self.input_name, input_shape) torch.cuda.synchronize(self.stream) self.context.execute_async_v3(self.stream.cuda_stream) torch.cuda.synchronize(self.stream) results = [] for i in range(batch_size): pred = self.output_buffers[0][i].cpu().numpy() pred = pred.T # 转置: (8400, 84) boxes = pred[:, :4] scores = pred[:, 4] classes = pred[:, 5].astype(np.int32) mask = scores > self.conf_thresh boxes = boxes[mask] scores = scores[mask] classes = classes[mask] indices = cv2.dnn.NMSBoxes( boxes.tolist(), scores.tolist(), self.conf_thresh, self.iou_thresh, ) if len(indices) > 0: for idx in indices: box = boxes[idx] x1, y1, x2, y2 = box w, h = x2 - x1, y2 - y1 conf = scores[idx] cls = classes[idx] orig_h, orig_w = images[i].shape[:2] scale_x, scale_y = orig_w / self.imgsz[1], orig_h / self.imgsz[0] box_orig = [ int(x1 * scale_x), int(y1 * scale_y), int(w * scale_x), int(h * scale_y), ] result = Results( orig_img=images[i], path="", names={0: "person"}, boxes=UltralyticsBoxes( torch.tensor([box_orig + [conf, cls]]), orig_shape=(orig_h, orig_w), ), ) results.append(result) return results def inference_single(self, frame: np.ndarray) -> List[Results]: return self.inference([frame]) def warmup(self, num_warmup: int = 10): dummy_frame = np.zeros((480, 640, 3), dtype=np.uint8) for _ in range(num_warmup): self.inference_single(dummy_frame) def __del__(self): if self.context: try: self.context.synchronize() except Exception: pass if self.stream: try: self.stream.synchronize() except Exception: pass class Boxes: def __init__( self, data: torch.Tensor, orig_shape: Tuple[int, int], is_track: bool = False, ): self.data = data self.orig_shape = orig_shape self.is_track = is_track @property def xyxy(self): if self.is_track: return self.data[:, :4] return self.data[:, :4] @property def conf(self): if self.is_track: return self.data[:, 4] return self.data[:, 4] @property def cls(self): if self.is_track: return self.data[:, 5] return self.data[:, 5] def _check_pt_file_valid(pt_path: str) -> bool: try: with open(pt_path, 'rb') as f: header = f.read(10) return len(header) == 10 except Exception: return False class YOLOEngine: def __init__( self, model_path: Optional[str] = None, device: int = 0, use_trt: bool = True, ): self.use_trt = False self.onnx_engine = None self.trt_engine = None self.device = device config = get_config() if use_trt: try: self.trt_engine = TensorRTEngine(device=device) self.trt_engine.warmup() self.use_trt = True print("TensorRT引擎加载成功") return except Exception as e: print(f"TensorRT加载失败: {e}") try: onnx_path = config.model.onnx_path if os.path.exists(onnx_path): self.onnx_engine = ONNXEngine(device=device) self.onnx_engine.warmup() print("ONNX引擎加载成功") return else: print(f"ONNX模型不存在: {onnx_path}") except Exception as e: print(f"ONNX加载失败: {e}") try: pt_path = model_path or config.model.pt_model_path if os.path.exists(pt_path) and _check_pt_file_valid(pt_path): self.model = YOLO(pt_path) self.model.to(device) print(f"PyTorch模型加载成功: {pt_path}") else: raise FileNotFoundError(f"PT文件无效或不存在: {pt_path}") except Exception as e: print(f"PyTorch加载失败: {e}") raise RuntimeError("所有模型加载方式均失败") def __call__(self, frame: np.ndarray, **kwargs) -> List[Results]: if self.use_trt and self.trt_engine: try: return self.trt_engine.inference_single(frame) except Exception as e: print(f"TensorRT推理失败,切换到ONNX: {e}") self.use_trt = False if self.onnx_engine: return self.onnx_engine.inference_single(frame) elif self.model: return self.model(frame, imgsz=get_config().model.imgsz, **kwargs) else: return [] elif self.onnx_engine: return self.onnx_engine.inference_single(frame) else: results = self.model(frame, imgsz=get_config().model.imgsz, **kwargs) return results def __del__(self): if self.trt_engine: del self.trt_engine if self.onnx_engine: del self.onnx_engine