Files
security-ai-edge/core/preprocessor.py

457 lines
13 KiB
Python
Raw Normal View History

2026-01-29 18:33:12 +08:00
"""
图像预处理流水线模块
实现ROI裁剪Letterbox预处理Batch打包等功能
"""
import logging
import threading
import time
from typing import Any, Dict, List, Optional, Tuple, Union
import cv2
import numpy as np
from config.settings import get_settings, InferenceConfig
from config.config_models import ROIInfo, ROIType
from utils.logger import get_logger
logger = logging.getLogger(__name__)
class ROICropper:
"""ROI裁剪器类
支持多边形和矩形两种区域的裁剪
"""
def __init__(self):
self._logger = get_logger("preprocessor")
def crop(
self,
image: np.ndarray,
roi: ROIInfo
) -> Optional[np.ndarray]:
"""
裁剪ROI区域
Args:
image: 原始图像 (BGR格式)
roi: ROI配置信息
Returns:
裁剪后的图像失败返回None
"""
try:
if roi.roi_type == ROIType.RECTANGLE:
return self._crop_rectangle(image, roi.coordinates)
elif roi.roi_type == ROIType.POLYGON:
return self._crop_polygon(image, roi.coordinates)
else:
self._logger.warning(f"不支持的ROI类型: {roi.roi_type}")
return None
except Exception as e:
self._logger.error(f"ROI裁剪失败: {e}")
return None
def _crop_rectangle(
self,
image: np.ndarray,
coordinates: List[List[float]]
) -> Optional[np.ndarray]:
"""裁剪矩形区域"""
if len(coordinates) < 2:
return None
x1, y1 = int(coordinates[0][0]), int(coordinates[0][1])
x2, y2 = int(coordinates[1][0]), int(coordinates[1][1])
x1 = max(0, min(x1, image.shape[1] - 1))
y1 = max(0, min(y1, image.shape[0] - 1))
x2 = max(0, min(x2, image.shape[1]))
y2 = max(0, min(y2, image.shape[0]))
if x2 <= x1 or y2 <= y1:
return None
return image[y1:y2, x1:x2]
def _crop_polygon(
self,
image: np.ndarray,
coordinates: List[List[float]]
) -> Optional[np.ndarray]:
"""裁剪多边形区域"""
if len(coordinates) < 3:
return None
height, width = image.shape[:2]
pts = np.array(coordinates, dtype=np.int32)
pts[:, 0] = np.clip(pts[:, 0], 0, width - 1)
pts[:, 1] = np.clip(pts[:, 1], 0, height - 1)
mask = np.zeros((height, width), dtype=np.uint8)
cv2.fillPoly(mask, [pts], 255)
masked_image = cv2.bitwise_and(image, image, mask=mask)
x1 = np.min(pts[:, 0])
y1 = np.min(pts[:, 1])
x2 = np.max(pts[:, 0])
y2 = np.max(pts[:, 1])
cropped = masked_image[y1:y2, x1:x2]
return cropped if cropped.size > 0 else None
def create_mask(
self,
image_shape: Tuple[int, int],
roi: ROIInfo
) -> np.ndarray:
"""
创建ROI掩码
Args:
image_shape: 图像形状 (height, width)
roi: ROI配置信息
Returns:
掩码图像
"""
height, width = image_shape
mask = np.zeros((height, width), dtype=np.uint8)
if roi.roi_type == ROIType.RECTANGLE:
if len(roi.coordinates) >= 2:
x1, y1 = int(roi.coordinates[0])
x2, y2 = int(roi.coordinates[1])
x1, x2 = sorted([x1, x2])
y1, y2 = sorted([y1, y2])
mask[y1:y2, x1:x2] = 255
elif roi.roi_type == ROIType.POLYGON:
pts = np.array(roi.coordinates, dtype=np.int32)
pts[:, 0] = np.clip(pts[:, 0], 0, width - 1)
pts[:, 1] = np.clip(pts[:, 1], 0, height - 1)
cv2.fillPoly(mask, [pts], 255)
return mask
class LetterboxPreprocessor:
"""Letterbox预处理器类
实现等比例缩放灰色填充保持物体原始比例
"""
def __init__(self, target_size: Tuple[int, int] = (480, 480)):
"""
初始化Letterbox处理器
Args:
target_size: 目标尺寸 (width, height)
"""
self.target_width, self.target_height = target_size
self.pad_color = (114, 114, 114)
def preprocess(self, image: np.ndarray) -> Tuple[np.ndarray, Tuple[float, float, float, float]]:
"""
Letterbox预处理
Args:
image: 输入图像 (BGR格式)
Returns:
tuple: (处理后的图像, 缩放信息 (scale, pad_x, pad_y))
"""
original_height, original_width = image.shape[:2]
scale = min(
self.target_width / original_width,
self.target_height / original_height
)
new_width = int(original_width * scale)
new_height = int(original_height * scale)
resized = cv2.resize(
image,
(new_width, new_height),
interpolation=cv2.INTER_LINEAR
)
padded = np.full(
(self.target_height, self.target_width, 3),
self.pad_color,
dtype=np.uint8
)
pad_x = (self.target_width - new_width) // 2
pad_y = (self.target_height - new_height) // 2
padded[pad_y:pad_y + new_height, pad_x:pad_x + new_width] = resized
scale_info = (scale, pad_x, pad_y, scale)
return padded, scale_info
def revert_coordinates(
self,
box: List[float],
scale_info: Tuple[float, float, float, float]
) -> List[float]:
"""
将坐标从Letterbox空间还原到原始空间
Args:
box: Letterbox空间中的坐标 [x1, y1, x2, y2]
scale_info: 缩放信息 (scale, pad_x, pad_y, scale)
Returns:
原始空间中的坐标
"""
scale, pad_x, pad_y, _ = scale_info
x1 = (box[0] - pad_x) / scale
y1 = (box[1] - pad_y) / scale
x2 = (box[2] - pad_x) / scale
y2 = (box[3] - pad_y) / scale
return [x1, y1, x2, y2]
class BatchPreprocessor:
"""Batch预处理器类
固定 batch=4支持 padding batch=4
2026-01-29 18:33:12 +08:00
"""
BATCH_SIZE = 4
2026-01-29 18:33:12 +08:00
def __init__(
self,
target_size: Tuple[int, int] = (480, 480),
fp16_mode: bool = True
):
"""
初始化Batch预处理器
Args:
target_size: 目标尺寸 (width, height)
fp16_mode: 是否使用FP16精度
"""
self.target_size = target_size
self.fp16_mode = fp16_mode
self.batch_size = self.BATCH_SIZE
2026-01-29 18:33:12 +08:00
self._letterbox = LetterboxPreprocessor(target_size)
self._logger = get_logger("preprocessor")
self._logger.info(
f"Batch预处理器: batch={self.batch_size}, "
f"target_size={target_size}, fp16={fp16_mode}"
)
@staticmethod
def pad_to_batch4(frames: List[np.ndarray]) -> np.ndarray:
"""
Padding batch=4重复最后一帧
Args:
frames: list of [3, 480, 480] numpy arrays
Returns:
np.ndarray: [4, 3, 480, 480]
"""
if len(frames) == 0:
raise ValueError("Empty frames list")
if len(frames) == 4:
return np.stack(frames)
pad_frame = frames[-1].copy()
while len(frames) < 4:
frames.append(pad_frame)
return np.stack(frames)
2026-01-29 18:33:12 +08:00
def preprocess_batch(
self,
images: List[np.ndarray]
) -> Tuple[np.ndarray, List[Tuple[float, float, float, float]]]:
"""
预处理批次图像自动 padding batch=4
2026-01-29 18:33:12 +08:00
Args:
images: 图像列表
Returns:
tuple: (批次数据 [4, 3, H, W], 缩放信息列表)
2026-01-29 18:33:12 +08:00
"""
batch_data, scale_info_list = self._preprocess_batch(images)
return batch_data, scale_info_list
def _preprocess_batch(
self,
images: List[np.ndarray]
) -> Tuple[np.ndarray, List[Tuple[float, float, float, float]]]:
"""内部预处理实现"""
padded_images = self.pad_to_batch4(images)
2026-01-29 18:33:12 +08:00
scale_info_list = []
processed_images = []
for i in range(self.batch_size):
processed, scale_info = self._letterbox.preprocess(padded_images[i])
2026-01-29 18:33:12 +08:00
processed_images.append(processed)
scale_info_list.append(scale_info)
batch_data = self._stack_and_normalize(processed_images)
return batch_data, scale_info_list
def _stack_and_normalize(self, images: List[np.ndarray]) -> np.ndarray:
"""堆叠并归一化图像"""
stacked = np.stack(images, axis=0)
stacked = stacked.astype(np.float32) / 255.0
stacked = np.transpose(stacked, (0, 3, 1, 2))
if self.fp16_mode:
stacked = stacked.astype(np.float16)
return stacked
class ImagePreprocessor:
"""图像预处理流水线主类
整合ROI裁剪LetterboxBatch打包等功能
"""
def __init__(self, config: Optional[InferenceConfig] = None):
"""
初始化预处理器
Args:
config: 推理配置
"""
if config is None:
settings = get_settings()
config = settings.inference
self.config = config
self._cropper = ROICropper()
self._letterbox = LetterboxPreprocessor(
(config.input_width, config.input_height)
)
self._batch_preprocessor = BatchPreprocessor(
target_size=(config.input_width, config.input_height),
fp16_mode=config.fp16_mode
)
self._logger = get_logger("preprocessor")
self._logger.info(
f"图像预处理器初始化完成: "
f"输入尺寸 {config.input_width}x{config.input_height}, "
f"Batch大小 {self._batch_preprocessor.batch_size}, "
2026-01-29 18:33:12 +08:00
f"FP16模式 {config.fp16_mode}"
)
def preprocess_single(
self,
image: np.ndarray,
roi: Optional[ROIInfo] = None
) -> Tuple[np.ndarray, Tuple[float, float, float, float]]:
"""
预处理单张图像
Args:
image: 原始图像
roi: 可选的ROI配置
Returns:
tuple: (预处理后的图像, 缩放信息)
"""
if roi is not None:
cropped = self._cropper.crop(image, roi)
if cropped is None:
cropped = image
else:
cropped = image
processed, scale_info = self._letterbox.preprocess(cropped)
return processed, scale_info
def preprocess_batch(
self,
images: List[np.ndarray],
rois: Optional[List[Optional[ROIInfo]]] = None
) -> Tuple[np.ndarray, List[Tuple[float, float, float, float]]]:
"""
预处理批次图像自动 padding batch=4
2026-01-29 18:33:12 +08:00
Args:
images: 原始图像列表
rois: 可选的ROI配置列表
Returns:
tuple: (批次数据 [4, 3, H, W], 缩放信息列表)
2026-01-29 18:33:12 +08:00
"""
from core.tensorrt_engine import pad_to_batch4
2026-01-29 18:33:12 +08:00
if rois is None:
rois = [None] * len(images)
processed_images = []
scale_info_list = []
for image, roi in zip(images, rois):
processed, scale_info = self.preprocess_single(image, roi)
processed_images.append(processed)
scale_info_list.append(scale_info)
batch_data = self._batch_preprocessor.preprocess_batch(processed_images)
2026-01-29 18:33:12 +08:00
return batch_data, scale_info_list
def revert_boxes(
self,
boxes: List[List[float]],
scale_info: Tuple[float, float, float, float]
) -> List[List[float]]:
"""
将检测框坐标还原到原始图像空间
Args:
boxes: Letterbox空间中的检测框
scale_info: 缩放信息
Returns:
原始空间中的检测框
"""
return [self._letterbox.revert_coordinates(box, scale_info) for box in boxes]
def get_statistics(self) -> Dict[str, Any]:
"""获取预处理器统计信息"""
return {
"config": {
"input_width": self.config.input_width,
"input_height": self.config.input_height,
"batch_size": self._batch_preprocessor.batch_size,
2026-01-29 18:33:12 +08:00
"fp16_mode": self.config.fp16_mode,
},
}
def release_resources(self):
"""释放资源"""
self._logger.info("预处理器资源已释放")