Files
security-ai-edge/core/preprocessor.py
16337 b0ddb6ee1a feat(project): move edge_inference_service contents to root and update paths
- Moved all project files and directories (config, core, models, etc.) from
  edge_inference_service/ to the repository root ai_edge/
- Updated model path in config/settings.py to reflect new structure
- Revised usage paths in __init__.py documentation
2026-01-29 18:43:19 +08:00

476 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
图像预处理流水线模块
实现ROI裁剪、Letterbox预处理、Batch打包等功能
"""
import logging
import threading
import time
from typing import Any, Dict, List, Optional, Tuple, Union
import cv2
import numpy as np
from config.settings import get_settings, InferenceConfig
from config.config_models import ROIInfo, ROIType
from utils.logger import get_logger
logger = logging.getLogger(__name__)
class ROICropper:
"""ROI裁剪器类
支持多边形和矩形两种区域的裁剪
"""
def __init__(self):
self._logger = get_logger("preprocessor")
def crop(
self,
image: np.ndarray,
roi: ROIInfo
) -> Optional[np.ndarray]:
"""
裁剪ROI区域
Args:
image: 原始图像 (BGR格式)
roi: ROI配置信息
Returns:
裁剪后的图像失败返回None
"""
try:
if roi.roi_type == ROIType.RECTANGLE:
return self._crop_rectangle(image, roi.coordinates)
elif roi.roi_type == ROIType.POLYGON:
return self._crop_polygon(image, roi.coordinates)
else:
self._logger.warning(f"不支持的ROI类型: {roi.roi_type}")
return None
except Exception as e:
self._logger.error(f"ROI裁剪失败: {e}")
return None
def _crop_rectangle(
self,
image: np.ndarray,
coordinates: List[List[float]]
) -> Optional[np.ndarray]:
"""裁剪矩形区域"""
if len(coordinates) < 2:
return None
x1, y1 = int(coordinates[0][0]), int(coordinates[0][1])
x2, y2 = int(coordinates[1][0]), int(coordinates[1][1])
x1 = max(0, min(x1, image.shape[1] - 1))
y1 = max(0, min(y1, image.shape[0] - 1))
x2 = max(0, min(x2, image.shape[1]))
y2 = max(0, min(y2, image.shape[0]))
if x2 <= x1 or y2 <= y1:
return None
return image[y1:y2, x1:x2]
def _crop_polygon(
self,
image: np.ndarray,
coordinates: List[List[float]]
) -> Optional[np.ndarray]:
"""裁剪多边形区域"""
if len(coordinates) < 3:
return None
height, width = image.shape[:2]
pts = np.array(coordinates, dtype=np.int32)
pts[:, 0] = np.clip(pts[:, 0], 0, width - 1)
pts[:, 1] = np.clip(pts[:, 1], 0, height - 1)
mask = np.zeros((height, width), dtype=np.uint8)
cv2.fillPoly(mask, [pts], 255)
masked_image = cv2.bitwise_and(image, image, mask=mask)
x1 = np.min(pts[:, 0])
y1 = np.min(pts[:, 1])
x2 = np.max(pts[:, 0])
y2 = np.max(pts[:, 1])
cropped = masked_image[y1:y2, x1:x2]
return cropped if cropped.size > 0 else None
def create_mask(
self,
image_shape: Tuple[int, int],
roi: ROIInfo
) -> np.ndarray:
"""
创建ROI掩码
Args:
image_shape: 图像形状 (height, width)
roi: ROI配置信息
Returns:
掩码图像
"""
height, width = image_shape
mask = np.zeros((height, width), dtype=np.uint8)
if roi.roi_type == ROIType.RECTANGLE:
if len(roi.coordinates) >= 2:
x1, y1 = int(roi.coordinates[0])
x2, y2 = int(roi.coordinates[1])
x1, x2 = sorted([x1, x2])
y1, y2 = sorted([y1, y2])
mask[y1:y2, x1:x2] = 255
elif roi.roi_type == ROIType.POLYGON:
pts = np.array(roi.coordinates, dtype=np.int32)
pts[:, 0] = np.clip(pts[:, 0], 0, width - 1)
pts[:, 1] = np.clip(pts[:, 1], 0, height - 1)
cv2.fillPoly(mask, [pts], 255)
return mask
class LetterboxPreprocessor:
"""Letterbox预处理器类
实现等比例缩放,灰色填充,保持物体原始比例
"""
def __init__(self, target_size: Tuple[int, int] = (480, 480)):
"""
初始化Letterbox处理器
Args:
target_size: 目标尺寸 (width, height)
"""
self.target_width, self.target_height = target_size
self.pad_color = (114, 114, 114)
def preprocess(self, image: np.ndarray) -> Tuple[np.ndarray, Tuple[float, float, float, float]]:
"""
Letterbox预处理
Args:
image: 输入图像 (BGR格式)
Returns:
tuple: (处理后的图像, 缩放信息 (scale, pad_x, pad_y))
"""
original_height, original_width = image.shape[:2]
scale = min(
self.target_width / original_width,
self.target_height / original_height
)
new_width = int(original_width * scale)
new_height = int(original_height * scale)
resized = cv2.resize(
image,
(new_width, new_height),
interpolation=cv2.INTER_LINEAR
)
padded = np.full(
(self.target_height, self.target_width, 3),
self.pad_color,
dtype=np.uint8
)
pad_x = (self.target_width - new_width) // 2
pad_y = (self.target_height - new_height) // 2
padded[pad_y:pad_y + new_height, pad_x:pad_x + new_width] = resized
scale_info = (scale, pad_x, pad_y, scale)
return padded, scale_info
def revert_coordinates(
self,
box: List[float],
scale_info: Tuple[float, float, float, float]
) -> List[float]:
"""
将坐标从Letterbox空间还原到原始空间
Args:
box: Letterbox空间中的坐标 [x1, y1, x2, y2]
scale_info: 缩放信息 (scale, pad_x, pad_y, scale)
Returns:
原始空间中的坐标
"""
scale, pad_x, pad_y, _ = scale_info
x1 = (box[0] - pad_x) / scale
y1 = (box[1] - pad_y) / scale
x2 = (box[2] - pad_x) / scale
y2 = (box[3] - pad_y) / scale
return [x1, y1, x2, y2]
class BatchPreprocessor:
"""Batch预处理器类
支持动态Batch大小转换为NCHW格式FP16精度
"""
def __init__(
self,
target_size: Tuple[int, int] = (480, 480),
max_batch_size: int = 8,
fp16_mode: bool = True
):
"""
初始化Batch预处理器
Args:
target_size: 目标尺寸 (width, height)
max_batch_size: 最大Batch大小
fp16_mode: 是否使用FP16精度
"""
self.target_size = target_size
self.max_batch_size = max_batch_size
self.fp16_mode = fp16_mode
self._letterbox = LetterboxPreprocessor(target_size)
self._logger = get_logger("preprocessor")
self._lock = threading.Lock()
self._memory_pool: List[np.ndarray] = []
self._preallocated_size = max_batch_size
def preprocess_batch(
self,
images: List[np.ndarray]
) -> Tuple[np.ndarray, List[Tuple[float, float, float, float]]]:
"""
预处理一个批次的图像
Args:
images: 图像列表
Returns:
tuple: (批次数据, 缩放信息列表)
"""
batch_size = len(images)
batch_size = min(batch_size, self.max_batch_size)
scale_info_list = []
processed_images = []
for i in range(batch_size):
if i >= len(images):
break
processed, scale_info = self._letterbox.preprocess(images[i])
processed_images.append(processed)
scale_info_list.append(scale_info)
batch_data = self._stack_and_normalize(processed_images)
return batch_data, scale_info_list
def _stack_and_normalize(self, images: List[np.ndarray]) -> np.ndarray:
"""堆叠并归一化图像"""
stacked = np.stack(images, axis=0)
stacked = stacked.astype(np.float32) / 255.0
stacked = np.transpose(stacked, (0, 3, 1, 2))
if self.fp16_mode:
stacked = stacked.astype(np.float16)
return stacked
def allocate_batch_memory(self, batch_size: int) -> np.ndarray:
"""
分配批次内存
Args:
batch_size: 批次大小
Returns:
预分配的numpy数组
"""
batch_size = min(batch_size, self.max_batch_size)
with self._lock:
for mem in self._memory_pool:
if mem.shape[0] == batch_size:
return mem
height, width = self.target_size
shape = (batch_size, 3, height, width)
if self.fp16_mode:
mem = np.zeros(shape, dtype=np.float16)
else:
mem = np.zeros(shape, dtype=np.float32)
self._memory_pool.append(mem)
return mem
def release_memory(self):
"""释放内存池"""
with self._lock:
self._memory_pool.clear()
self._logger.info("预处理内存池已释放")
def get_memory_usage(self) -> Dict[str, int]:
"""获取内存使用情况"""
with self._lock:
total_bytes = sum(
mem.nbytes for mem in self._memory_pool
)
return {
"total_bytes": total_bytes,
"total_mb": total_bytes / (1024 ** 2),
"block_count": len(self._memory_pool)
}
class ImagePreprocessor:
"""图像预处理流水线主类
整合ROI裁剪、Letterbox、Batch打包等功能
"""
def __init__(self, config: Optional[InferenceConfig] = None):
"""
初始化预处理器
Args:
config: 推理配置
"""
if config is None:
settings = get_settings()
config = settings.inference
self.config = config
self._cropper = ROICropper()
self._letterbox = LetterboxPreprocessor(
(config.input_width, config.input_height)
)
self._batch_preprocessor = BatchPreprocessor(
target_size=(config.input_width, config.input_height),
max_batch_size=config.max_batch_size,
fp16_mode=config.fp16_mode
)
self._logger = get_logger("preprocessor")
self._logger.info(
f"图像预处理器初始化完成: "
f"输入尺寸 {config.input_width}x{config.input_height}, "
f"Batch大小 {config.batch_size}-{config.max_batch_size}, "
f"FP16模式 {config.fp16_mode}"
)
def preprocess_single(
self,
image: np.ndarray,
roi: Optional[ROIInfo] = None
) -> Tuple[np.ndarray, Tuple[float, float, float, float]]:
"""
预处理单张图像
Args:
image: 原始图像
roi: 可选的ROI配置
Returns:
tuple: (预处理后的图像, 缩放信息)
"""
if roi is not None:
cropped = self._cropper.crop(image, roi)
if cropped is None:
cropped = image
else:
cropped = image
processed, scale_info = self._letterbox.preprocess(cropped)
return processed, scale_info
def preprocess_batch(
self,
images: List[np.ndarray],
rois: Optional[List[Optional[ROIInfo]]] = None
) -> Tuple[np.ndarray, List[Tuple[float, float, float, float]]]:
"""
预处理批次图像
Args:
images: 原始图像列表
rois: 可选的ROI配置列表
Returns:
tuple: (批次数据, 缩放信息列表)
"""
if rois is None:
rois = [None] * len(images)
processed_images = []
scale_info_list = []
for image, roi in zip(images, rois):
processed, scale_info = self.preprocess_single(image, roi)
processed_images.append(processed)
scale_info_list.append(scale_info)
batch_data = self._batch_preprocessor._stack_and_normalize(processed_images)
return batch_data, scale_info_list
def revert_boxes(
self,
boxes: List[List[float]],
scale_info: Tuple[float, float, float, float]
) -> List[List[float]]:
"""
将检测框坐标还原到原始图像空间
Args:
boxes: Letterbox空间中的检测框
scale_info: 缩放信息
Returns:
原始空间中的检测框
"""
return [self._letterbox.revert_coordinates(box, scale_info) for box in boxes]
def get_statistics(self) -> Dict[str, Any]:
"""获取预处理器统计信息"""
return {
"config": {
"input_width": self.config.input_width,
"input_height": self.config.input_height,
"batch_size": self.config.batch_size,
"max_batch_size": self.config.max_batch_size,
"fp16_mode": self.config.fp16_mode,
},
"memory": self._batch_preprocessor.get_memory_usage(),
}
def release_resources(self):
"""释放资源"""
self._batch_preprocessor.release_memory()