""" 图像预处理流水线模块 实现ROI裁剪、Letterbox预处理、Batch打包等功能 """ import logging import threading import time from typing import Any, Dict, List, Optional, Tuple, Union import cv2 import numpy as np from config.settings import get_settings, InferenceConfig from config.config_models import ROIInfo, ROIType from utils.logger import get_logger logger = logging.getLogger(__name__) class ROICropper: """ROI裁剪器类 支持多边形和矩形两种区域的裁剪 """ def __init__(self): self._logger = get_logger("preprocessor") def crop( self, image: np.ndarray, roi: ROIInfo ) -> Optional[np.ndarray]: """ 裁剪ROI区域 Args: image: 原始图像 (BGR格式) roi: ROI配置信息 Returns: 裁剪后的图像,失败返回None """ try: if roi.roi_type == ROIType.RECTANGLE: return self._crop_rectangle(image, roi.coordinates) elif roi.roi_type == ROIType.POLYGON: return self._crop_polygon(image, roi.coordinates) else: self._logger.warning(f"不支持的ROI类型: {roi.roi_type}") return None except Exception as e: self._logger.error(f"ROI裁剪失败: {e}") return None def _crop_rectangle( self, image: np.ndarray, coordinates: Union[List[List[float]], Dict[str, float]] ) -> Optional[np.ndarray]: """裁剪矩形区域 支持两种坐标格式: 1. dict: {"x": float, "y": float, "w": float, "h": float} — 归一化坐标(0-1) 2. list: [[x1,y1],[x2,y2]] — 像素坐标 """ img_h, img_w = image.shape[:2] if isinstance(coordinates, dict): x1 = int(coordinates["x"] * img_w) y1 = int(coordinates["y"] * img_h) x2 = int((coordinates["x"] + coordinates["w"]) * img_w) y2 = int((coordinates["y"] + coordinates["h"]) * img_h) else: if len(coordinates) < 2: return None x1, y1 = int(coordinates[0][0]), int(coordinates[0][1]) x2, y2 = int(coordinates[1][0]), int(coordinates[1][1]) x1 = max(0, min(x1, image.shape[1] - 1)) y1 = max(0, min(y1, image.shape[0] - 1)) x2 = max(0, min(x2, image.shape[1])) y2 = max(0, min(y2, image.shape[0])) if x2 <= x1 or y2 <= y1: return None return image[y1:y2, x1:x2] def _crop_polygon( self, image: np.ndarray, coordinates: List[List[float]] ) -> Optional[np.ndarray]: """裁剪多边形区域""" if len(coordinates) < 3: return None height, width = image.shape[:2] pts = np.array(coordinates, dtype=np.int32) pts[:, 0] = np.clip(pts[:, 0], 0, width - 1) pts[:, 1] = np.clip(pts[:, 1], 0, height - 1) mask = np.zeros((height, width), dtype=np.uint8) cv2.fillPoly(mask, [pts], 255) masked_image = cv2.bitwise_and(image, image, mask=mask) x1 = np.min(pts[:, 0]) y1 = np.min(pts[:, 1]) x2 = np.max(pts[:, 0]) y2 = np.max(pts[:, 1]) cropped = masked_image[y1:y2, x1:x2] return cropped if cropped.size > 0 else None def create_mask( self, image_shape: Tuple[int, int], roi: ROIInfo ) -> np.ndarray: """ 创建ROI掩码 Args: image_shape: 图像形状 (height, width) roi: ROI配置信息 Returns: 掩码图像 """ height, width = image_shape mask = np.zeros((height, width), dtype=np.uint8) if roi.roi_type == ROIType.RECTANGLE: if len(roi.coordinates) >= 2: x1, y1 = int(roi.coordinates[0][0]), int(roi.coordinates[0][1]) x2, y2 = int(roi.coordinates[1][0]), int(roi.coordinates[1][1]) x1, x2 = sorted([x1, x2]) y1, y2 = sorted([y1, y2]) mask[y1:y2, x1:x2] = 255 elif roi.roi_type == ROIType.POLYGON: pts = np.array(roi.coordinates, dtype=np.int32) pts[:, 0] = np.clip(pts[:, 0], 0, width - 1) pts[:, 1] = np.clip(pts[:, 1], 0, height - 1) cv2.fillPoly(mask, [pts], 255) return mask class LetterboxPreprocessor: """Letterbox预处理器类 实现等比例缩放,灰色填充,保持物体原始比例 """ def __init__(self, target_size: Tuple[int, int] = (480, 480)): """ 初始化Letterbox处理器 Args: target_size: 目标尺寸 (width, height) """ self.target_width, self.target_height = target_size self.pad_color = (114, 114, 114) def preprocess(self, image: np.ndarray) -> Tuple[np.ndarray, Tuple[float, float, float, float]]: """ Letterbox预处理 Args: image: 输入图像 (BGR格式) Returns: tuple: (处理后的图像, 缩放信息 (scale, pad_x, pad_y)) """ original_height, original_width = image.shape[:2] scale = min( self.target_width / original_width, self.target_height / original_height ) new_width = int(original_width * scale) new_height = int(original_height * scale) resized = cv2.resize( image, (new_width, new_height), interpolation=cv2.INTER_LINEAR ) padded = np.full( (self.target_height, self.target_width, 3), self.pad_color, dtype=np.uint8 ) pad_x = (self.target_width - new_width) // 2 pad_y = (self.target_height - new_height) // 2 padded[pad_y:pad_y + new_height, pad_x:pad_x + new_width] = resized scale_info = (scale, pad_x, pad_y, scale) return padded, scale_info def revert_coordinates( self, box: List[float], scale_info: Tuple[float, float, float, float] ) -> List[float]: """ 将坐标从Letterbox空间还原到原始空间 Args: box: Letterbox空间中的坐标 [x1, y1, x2, y2] scale_info: 缩放信息 (scale, pad_x, pad_y, scale) Returns: 原始空间中的坐标 """ scale, pad_x, pad_y, _ = scale_info x1 = (box[0] - pad_x) / scale y1 = (box[1] - pad_y) / scale x2 = (box[2] - pad_x) / scale y2 = (box[3] - pad_y) / scale return [x1, y1, x2, y2] class BatchPreprocessor: """Batch预处理器类 (支持动态 batch 1~8)""" MAX_BATCH_SIZE = 8 def __init__( self, target_size: Tuple[int, int] = (480, 480), fp16_mode: bool = True ): self.target_size = target_size self.fp16_mode = fp16_mode self.max_batch_size = self.MAX_BATCH_SIZE self._logger = get_logger("preprocessor") self._logger.info( f"Batch预处理器: max_batch={self.max_batch_size}, " f"target_size={target_size}, fp16={fp16_mode}" ) def preprocess_single( self, image: np.ndarray ) -> np.ndarray: """ 预处理单帧图像 Args: image: numpy 数组 Returns: np.ndarray: [1, 3, H, W] """ normalized = image.astype(np.float32) / 255.0 transposed = np.transpose(normalized, (2, 0, 1)) batched = transposed[None, ...] if self.fp16_mode: batched = batched.astype(np.float16) return batched def preprocess_batch( self, images: List[np.ndarray] ) -> Tuple[np.ndarray, List[Tuple[float, float, float, float]]]: """ 预处理批次图像 (支持动态 batch) Args: images: 已经过 letterbox 的图像列表 Returns: tuple: (批次数据 [N, 3, H, W], 缩放信息列表) """ if not images: raise ValueError("Empty images list") letterbox = LetterboxPreprocessor(self.target_size) processed_list = [] scale_infos = [] for img in images: processed, scale_info = letterbox.preprocess(img) processed_list.append(processed) scale_infos.append(scale_info) # 逐帧 normalize + transpose,然后 stack 成 [N, 3, H, W] batch_frames = [] for processed in processed_list: normalized = processed.astype(np.float32) / 255.0 transposed = np.transpose(normalized, (2, 0, 1)) batch_frames.append(transposed) batch_data = np.stack(batch_frames) if self.fp16_mode: batch_data = batch_data.astype(np.float16) return batch_data, scale_infos class ImagePreprocessor: """图像预处理流水线主类 整合ROI裁剪、Letterbox、Batch打包等功能 """ def __init__(self, config: Optional[InferenceConfig] = None): """ 初始化预处理器 Args: config: 推理配置 """ if config is None: settings = get_settings() config = settings.inference self.config = config self._cropper = ROICropper() self._letterbox = LetterboxPreprocessor( (config.input_width, config.input_height) ) self._batch_preprocessor = BatchPreprocessor( target_size=(config.input_width, config.input_height), fp16_mode=config.fp16_mode ) self._logger = get_logger("preprocessor") self._logger.info( f"图像预处理器初始化完成: " f"输入尺寸 {config.input_width}x{config.input_height}, " f"最大Batch {self._batch_preprocessor.max_batch_size}, " f"FP16模式 {config.fp16_mode}" ) def preprocess_single( self, image: np.ndarray, roi: Optional[ROIInfo] = None ) -> Tuple[np.ndarray, Tuple[float, float, float, float]]: """ 预处理单张图像 Args: image: 原始图像 roi: 可选的ROI配置 Returns: tuple: (预处理后的图像, 缩放信息) """ if roi is not None: cropped = self._cropper.crop(image, roi) if cropped is None: cropped = image else: cropped = image processed, scale_info = self._letterbox.preprocess(cropped) return processed, scale_info def preprocess_batch( self, images: List[np.ndarray], rois: Optional[List[Optional[ROIInfo]]] = None ) -> Tuple[np.ndarray, List[Tuple[float, float, float, float]]]: """ 预处理批次图像 Args: images: 原始图像列表 rois: 可选的ROI配置列表 Returns: tuple: (批次数据 [N, 3, H, W], 缩放信息列表) """ if rois is None: rois = [None] * len(images) processed_images = [] scale_info_list = [] for image, roi in zip(images, rois): if roi is not None: cropped = self._cropper.crop(image, roi) if cropped is None: cropped = image else: cropped = image processed_images.append(cropped) # BatchPreprocessor 处理 letterbox + normalize + stack batch_data, batch_scale_infos = self._batch_preprocessor.preprocess_batch( processed_images ) return batch_data, batch_scale_infos def revert_boxes( self, boxes: List[List[float]], scale_info: Tuple[float, float, float, float] ) -> List[List[float]]: """ 将检测框坐标还原到原始图像空间 Args: boxes: Letterbox空间中的检测框 scale_info: 缩放信息 Returns: 原始空间中的检测框 """ return [self._letterbox.revert_coordinates(box, scale_info) for box in boxes] def get_statistics(self) -> Dict[str, Any]: """获取预处理器统计信息""" return { "config": { "input_width": self.config.input_width, "input_height": self.config.input_height, "batch_size": self._batch_preprocessor.max_batch_size, "fp16_mode": self.config.fp16_mode, }, } def release_resources(self): """释放资源""" self._logger.info("预处理器资源已释放")