245 lines
7.6 KiB
Python
245 lines
7.6 KiB
Python
|
|
import os
|
|||
|
|
import time
|
|||
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|||
|
|
|
|||
|
|
import cv2
|
|||
|
|
import numpy as np
|
|||
|
|
import tensorrt as trt
|
|||
|
|
import torch
|
|||
|
|
from ultralytics import YOLO
|
|||
|
|
from ultralytics.engine.results import Results
|
|||
|
|
|
|||
|
|
from config import get_config
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TensorRTEngine:
|
|||
|
|
def __init__(self, engine_path: Optional[str] = None, device: int = 0):
|
|||
|
|
config = get_config()
|
|||
|
|
self.engine_path = engine_path or config.model.engine_path
|
|||
|
|
self.device = device
|
|||
|
|
self.imgsz = tuple(config.model.imgsz)
|
|||
|
|
self.conf_thresh = config.model.conf_threshold
|
|||
|
|
self.iou_thresh = config.model.iou_threshold
|
|||
|
|
self.half = config.model.half
|
|||
|
|
|
|||
|
|
self.logger = trt.Logger(trt.Logger.INFO)
|
|||
|
|
self.engine = None
|
|||
|
|
self.context = None
|
|||
|
|
self.stream = None
|
|||
|
|
self.input_buffer = None
|
|||
|
|
self.output_buffers = []
|
|||
|
|
|
|||
|
|
self._load_engine()
|
|||
|
|
|
|||
|
|
def _load_engine(self):
|
|||
|
|
if not os.path.exists(self.engine_path):
|
|||
|
|
raise FileNotFoundError(f"TensorRT引擎文件不存在: {self.engine_path}")
|
|||
|
|
|
|||
|
|
with open(self.engine_path, "rb") as f:
|
|||
|
|
serialized_engine = f.read()
|
|||
|
|
|
|||
|
|
runtime = trt.Runtime(self.logger)
|
|||
|
|
self.engine = runtime.deserialize_cuda_engine(serialized_engine)
|
|||
|
|
|
|||
|
|
self.context = self.engine.create_execution_context()
|
|||
|
|
|
|||
|
|
self.stream = torch.cuda.Stream(device=self.device)
|
|||
|
|
|
|||
|
|
for i in range(self.engine.num_io_tensors):
|
|||
|
|
name = self.engine.get_tensor_name(i)
|
|||
|
|
dtype = self.engine.get_tensor_dtype(name)
|
|||
|
|
shape = self.engine.get_tensor_shape(name)
|
|||
|
|
|
|||
|
|
if self.engine.get_tensor_mode(name) == trt.TensorIOMode.INPUT:
|
|||
|
|
self.context.set_tensor_address(name, None)
|
|||
|
|
else:
|
|||
|
|
if dtype == trt.float16:
|
|||
|
|
buffer = torch.zeros(shape, dtype=torch.float16, device=self.device)
|
|||
|
|
else:
|
|||
|
|
buffer = torch.zeros(shape, dtype=torch.float32, device=self.device)
|
|||
|
|
self.output_buffers.append(buffer)
|
|||
|
|
self.context.set_tensor_address(name, buffer.data_ptr())
|
|||
|
|
|
|||
|
|
self.context.set_optimization_profile_async(0, self.stream)
|
|||
|
|
|
|||
|
|
self.input_buffer = torch.zeros(
|
|||
|
|
(1, 3, self.imgsz[0], self.imgsz[1]),
|
|||
|
|
dtype=torch.float16 if self.half else torch.float32,
|
|||
|
|
device=self.device,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def preprocess(self, frame: np.ndarray) -> torch.Tensor:
|
|||
|
|
img = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|||
|
|
img = cv2.resize(img, self.imgsz)
|
|||
|
|
|
|||
|
|
img = img.transpose(2, 0, 1).astype(np.float32) / 255.0
|
|||
|
|
|
|||
|
|
if self.half:
|
|||
|
|
img = img.astype(np.float16)
|
|||
|
|
|
|||
|
|
tensor = torch.from_numpy(img).unsqueeze(0).to(self.device)
|
|||
|
|
|
|||
|
|
return tensor
|
|||
|
|
|
|||
|
|
def inference(self, images: List[np.ndarray]) -> List[Results]:
|
|||
|
|
batch_size = len(images)
|
|||
|
|
if batch_size == 0:
|
|||
|
|
return []
|
|||
|
|
|
|||
|
|
input_tensor = self.preprocess(images[0])
|
|||
|
|
|
|||
|
|
if batch_size > 1:
|
|||
|
|
for i in range(1, batch_size):
|
|||
|
|
input_tensor = torch.cat(
|
|||
|
|
[input_tensor, self.preprocess(images[i])], dim=0
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
self.context.set_tensor_address(
|
|||
|
|
"input", input_tensor.contiguous().data_ptr()
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
torch.cuda.synchronize(self.stream)
|
|||
|
|
self.context.execute_async_v3(self.stream.handle)
|
|||
|
|
torch.cuda.synchronize(self.stream)
|
|||
|
|
|
|||
|
|
results = []
|
|||
|
|
for i in range(batch_size):
|
|||
|
|
pred = self.output_buffers[0][i].cpu().numpy()
|
|||
|
|
boxes = pred[:, :4]
|
|||
|
|
scores = pred[:, 4]
|
|||
|
|
classes = pred[:, 5].astype(np.int32)
|
|||
|
|
|
|||
|
|
mask = scores > self.conf_thresh
|
|||
|
|
boxes = boxes[mask]
|
|||
|
|
scores = scores[mask]
|
|||
|
|
classes = classes[mask]
|
|||
|
|
|
|||
|
|
indices = cv2.dnn.NMSBoxes(
|
|||
|
|
boxes.tolist(),
|
|||
|
|
scores.tolist(),
|
|||
|
|
self.conf_thresh,
|
|||
|
|
self.iou_thresh,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
if len(indices) > 0:
|
|||
|
|
for idx in indices:
|
|||
|
|
box = boxes[idx]
|
|||
|
|
x1, y1, x2, y2 = box
|
|||
|
|
w, h = x2 - x1, y2 - y1
|
|||
|
|
conf = scores[idx]
|
|||
|
|
cls = classes[idx]
|
|||
|
|
|
|||
|
|
orig_h, orig_w = images[i].shape[:2]
|
|||
|
|
scale_x, scale_y = orig_w / self.imgsz[1], orig_h / self.imgsz[0]
|
|||
|
|
box_orig = [
|
|||
|
|
int(x1 * scale_x),
|
|||
|
|
int(y1 * scale_y),
|
|||
|
|
int(w * scale_x),
|
|||
|
|
int(h * scale_y),
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
result = Results(
|
|||
|
|
orig_img=images[i],
|
|||
|
|
path="",
|
|||
|
|
names={0: "person"},
|
|||
|
|
boxes=Boxes(
|
|||
|
|
torch.tensor([box_orig + [conf, cls]]),
|
|||
|
|
orig_shape=(orig_h, orig_w),
|
|||
|
|
),
|
|||
|
|
)
|
|||
|
|
results.append(result)
|
|||
|
|
|
|||
|
|
return results
|
|||
|
|
|
|||
|
|
def inference_single(self, frame: np.ndarray) -> List[Results]:
|
|||
|
|
return self.inference([frame])
|
|||
|
|
|
|||
|
|
def warmup(self, num_warmup: int = 10):
|
|||
|
|
dummy_frame = np.zeros((480, 640, 3), dtype=np.uint8)
|
|||
|
|
for _ in range(num_warmup):
|
|||
|
|
self.inference_single(dummy_frame)
|
|||
|
|
|
|||
|
|
def __del__(self):
|
|||
|
|
if self.context:
|
|||
|
|
self.context.synchronize()
|
|||
|
|
if self.stream:
|
|||
|
|
self.stream.synchronize()
|
|||
|
|
|
|||
|
|
|
|||
|
|
class Boxes:
|
|||
|
|
def __init__(
|
|||
|
|
self,
|
|||
|
|
data: torch.Tensor,
|
|||
|
|
orig_shape: Tuple[int, int],
|
|||
|
|
is_track: bool = False,
|
|||
|
|
):
|
|||
|
|
self.data = data
|
|||
|
|
self.orig_shape = orig_shape
|
|||
|
|
self.is_track = is_track
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def xyxy(self):
|
|||
|
|
if self.is_track:
|
|||
|
|
return self.data[:, :4]
|
|||
|
|
return self.data[:, :4]
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def conf(self):
|
|||
|
|
if self.is_track:
|
|||
|
|
return self.data[:, 4]
|
|||
|
|
return self.data[:, 4]
|
|||
|
|
|
|||
|
|
@property
|
|||
|
|
def cls(self):
|
|||
|
|
if self.is_track:
|
|||
|
|
return self.data[:, 5]
|
|||
|
|
return self.data[:, 5]
|
|||
|
|
|
|||
|
|
|
|||
|
|
class YOLOEngine:
|
|||
|
|
def __init__(
|
|||
|
|
self,
|
|||
|
|
model_path: Optional[str] = None,
|
|||
|
|
device: int = 0,
|
|||
|
|
use_trt: bool = True,
|
|||
|
|
):
|
|||
|
|
self.use_trt = use_trt
|
|||
|
|
self.device = device
|
|||
|
|
self.trt_engine = None
|
|||
|
|
|
|||
|
|
if not use_trt:
|
|||
|
|
if model_path:
|
|||
|
|
pt_path = model_path
|
|||
|
|
elif hasattr(get_config().model, 'pt_model_path'):
|
|||
|
|
pt_path = get_config().model.pt_model_path
|
|||
|
|
else:
|
|||
|
|
pt_path = get_config().model.engine_path.replace(".engine", ".pt")
|
|||
|
|
self.model = YOLO(pt_path)
|
|||
|
|
self.model.to(device)
|
|||
|
|
else:
|
|||
|
|
try:
|
|||
|
|
self.trt_engine = TensorRTEngine(device=device)
|
|||
|
|
self.trt_engine.warmup()
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"TensorRT加载失败,回退到PyTorch: {e}")
|
|||
|
|
self.use_trt = False
|
|||
|
|
if model_path:
|
|||
|
|
pt_path = model_path
|
|||
|
|
elif hasattr(get_config().model, 'pt_model_path'):
|
|||
|
|
pt_path = get_config().model.pt_model_path
|
|||
|
|
else:
|
|||
|
|
pt_path = get_config().model.engine_path.replace(".engine", ".pt")
|
|||
|
|
self.model = YOLO(pt_path)
|
|||
|
|
self.model.to(device)
|
|||
|
|
|
|||
|
|
def __call__(self, frame: np.ndarray, **kwargs) -> List[Results]:
|
|||
|
|
if self.use_trt:
|
|||
|
|
return self.trt_engine.inference_single(frame)
|
|||
|
|
else:
|
|||
|
|
results = self.model(frame, imgsz=get_config().model.imgsz, **kwargs)
|
|||
|
|
return results
|
|||
|
|
|
|||
|
|
def __del__(self):
|
|||
|
|
if self.trt_engine:
|
|||
|
|
del self.trt_engine
|