Files
security-ai-edge/core/preprocessor.py
16337 181623428a feat(aiot): 告警冷却时间调整 + 截图本地保留 + 中文路径修复
- 离岗检测冷却时间: 300s → 600s(10分钟)
- 入侵检测冷却时间: 120s → 300s(5分钟)
- 入侵告警级别改为高(alarm_level=3)
- COS 不可用时保留本地截图文件,不再上报后删除
- 修复 cv2.imwrite 中文路径失败,改用 imencode + write_bytes
- 配置订阅在 LOCAL 模式下跳过 Redis 连接

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 09:57:02 +08:00

452 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
图像预处理流水线模块
实现ROI裁剪、Letterbox预处理、Batch打包等功能
"""
import logging
import threading
import time
from typing import Any, Dict, List, Optional, Tuple, Union
import cv2
import numpy as np
from config.settings import get_settings, InferenceConfig
from config.config_models import ROIInfo, ROIType
from utils.logger import get_logger
logger = logging.getLogger(__name__)
class ROICropper:
"""ROI裁剪器类
支持多边形和矩形两种区域的裁剪
"""
def __init__(self):
self._logger = get_logger("preprocessor")
def crop(
self,
image: np.ndarray,
roi: ROIInfo
) -> Optional[np.ndarray]:
"""
裁剪ROI区域
Args:
image: 原始图像 (BGR格式)
roi: ROI配置信息
Returns:
裁剪后的图像失败返回None
"""
try:
if roi.roi_type == ROIType.RECTANGLE:
return self._crop_rectangle(image, roi.coordinates)
elif roi.roi_type == ROIType.POLYGON:
return self._crop_polygon(image, roi.coordinates)
else:
self._logger.warning(f"不支持的ROI类型: {roi.roi_type}")
return None
except Exception as e:
self._logger.error(f"ROI裁剪失败: {e}")
return None
def _crop_rectangle(
self,
image: np.ndarray,
coordinates: Union[List[List[float]], Dict[str, float]]
) -> Optional[np.ndarray]:
"""裁剪矩形区域
支持两种坐标格式:
1. dict: {"x": float, "y": float, "w": float, "h": float} — 归一化坐标(0-1)
2. list: [[x1,y1],[x2,y2]] — 像素坐标
"""
img_h, img_w = image.shape[:2]
if isinstance(coordinates, dict):
x1 = int(coordinates["x"] * img_w)
y1 = int(coordinates["y"] * img_h)
x2 = int((coordinates["x"] + coordinates["w"]) * img_w)
y2 = int((coordinates["y"] + coordinates["h"]) * img_h)
else:
if len(coordinates) < 2:
return None
x1, y1 = int(coordinates[0][0]), int(coordinates[0][1])
x2, y2 = int(coordinates[1][0]), int(coordinates[1][1])
x1 = max(0, min(x1, image.shape[1] - 1))
y1 = max(0, min(y1, image.shape[0] - 1))
x2 = max(0, min(x2, image.shape[1]))
y2 = max(0, min(y2, image.shape[0]))
if x2 <= x1 or y2 <= y1:
return None
return image[y1:y2, x1:x2]
def _crop_polygon(
self,
image: np.ndarray,
coordinates: List[List[float]]
) -> Optional[np.ndarray]:
"""裁剪多边形区域"""
if len(coordinates) < 3:
return None
height, width = image.shape[:2]
pts = np.array(coordinates, dtype=np.int32)
pts[:, 0] = np.clip(pts[:, 0], 0, width - 1)
pts[:, 1] = np.clip(pts[:, 1], 0, height - 1)
mask = np.zeros((height, width), dtype=np.uint8)
cv2.fillPoly(mask, [pts], 255)
masked_image = cv2.bitwise_and(image, image, mask=mask)
x1 = np.min(pts[:, 0])
y1 = np.min(pts[:, 1])
x2 = np.max(pts[:, 0])
y2 = np.max(pts[:, 1])
cropped = masked_image[y1:y2, x1:x2]
return cropped if cropped.size > 0 else None
def create_mask(
self,
image_shape: Tuple[int, int],
roi: ROIInfo
) -> np.ndarray:
"""
创建ROI掩码
Args:
image_shape: 图像形状 (height, width)
roi: ROI配置信息
Returns:
掩码图像
"""
height, width = image_shape
mask = np.zeros((height, width), dtype=np.uint8)
if roi.roi_type == ROIType.RECTANGLE:
if len(roi.coordinates) >= 2:
x1, y1 = int(roi.coordinates[0][0]), int(roi.coordinates[0][1])
x2, y2 = int(roi.coordinates[1][0]), int(roi.coordinates[1][1])
x1, x2 = sorted([x1, x2])
y1, y2 = sorted([y1, y2])
mask[y1:y2, x1:x2] = 255
elif roi.roi_type == ROIType.POLYGON:
pts = np.array(roi.coordinates, dtype=np.int32)
pts[:, 0] = np.clip(pts[:, 0], 0, width - 1)
pts[:, 1] = np.clip(pts[:, 1], 0, height - 1)
cv2.fillPoly(mask, [pts], 255)
return mask
class LetterboxPreprocessor:
"""Letterbox预处理器类
实现等比例缩放,灰色填充,保持物体原始比例
"""
def __init__(self, target_size: Tuple[int, int] = (480, 480)):
"""
初始化Letterbox处理器
Args:
target_size: 目标尺寸 (width, height)
"""
self.target_width, self.target_height = target_size
self.pad_color = (114, 114, 114)
def preprocess(self, image: np.ndarray) -> Tuple[np.ndarray, Tuple[float, float, float, float]]:
"""
Letterbox预处理
Args:
image: 输入图像 (BGR格式)
Returns:
tuple: (处理后的图像, 缩放信息 (scale, pad_x, pad_y))
"""
original_height, original_width = image.shape[:2]
scale = min(
self.target_width / original_width,
self.target_height / original_height
)
new_width = int(original_width * scale)
new_height = int(original_height * scale)
resized = cv2.resize(
image,
(new_width, new_height),
interpolation=cv2.INTER_LINEAR
)
padded = np.full(
(self.target_height, self.target_width, 3),
self.pad_color,
dtype=np.uint8
)
pad_x = (self.target_width - new_width) // 2
pad_y = (self.target_height - new_height) // 2
padded[pad_y:pad_y + new_height, pad_x:pad_x + new_width] = resized
scale_info = (scale, pad_x, pad_y, scale)
return padded, scale_info
def revert_coordinates(
self,
box: List[float],
scale_info: Tuple[float, float, float, float]
) -> List[float]:
"""
将坐标从Letterbox空间还原到原始空间
Args:
box: Letterbox空间中的坐标 [x1, y1, x2, y2]
scale_info: 缩放信息 (scale, pad_x, pad_y, scale)
Returns:
原始空间中的坐标
"""
scale, pad_x, pad_y, _ = scale_info
x1 = (box[0] - pad_x) / scale
y1 = (box[1] - pad_y) / scale
x2 = (box[2] - pad_x) / scale
y2 = (box[3] - pad_y) / scale
return [x1, y1, x2, y2]
class BatchPreprocessor:
"""Batch预处理器类 (支持动态 batch 1~8)"""
MAX_BATCH_SIZE = 8
def __init__(
self,
target_size: Tuple[int, int] = (480, 480),
fp16_mode: bool = True
):
self.target_size = target_size
self.fp16_mode = fp16_mode
self.max_batch_size = self.MAX_BATCH_SIZE
self._logger = get_logger("preprocessor")
self._logger.info(
f"Batch预处理器: max_batch={self.max_batch_size}, "
f"target_size={target_size}, fp16={fp16_mode}"
)
def preprocess_single(
self,
image: np.ndarray
) -> np.ndarray:
"""
预处理单帧图像
Args:
image: numpy 数组
Returns:
np.ndarray: [1, 3, H, W]
"""
normalized = image.astype(np.float32) / 255.0
transposed = np.transpose(normalized, (2, 0, 1))
batched = transposed[None, ...]
if self.fp16_mode:
batched = batched.astype(np.float16)
return batched
def preprocess_batch(
self,
images: List[np.ndarray]
) -> Tuple[np.ndarray, List[Tuple[float, float, float, float]]]:
"""
预处理批次图像 (支持动态 batch)
Args:
images: 已经过 letterbox 的图像列表
Returns:
tuple: (批次数据 [N, 3, H, W], 缩放信息列表)
"""
if not images:
raise ValueError("Empty images list")
letterbox = LetterboxPreprocessor(self.target_size)
processed_list = []
scale_infos = []
for img in images:
processed, scale_info = letterbox.preprocess(img)
processed_list.append(processed)
scale_infos.append(scale_info)
# 逐帧 normalize + transpose然后 stack 成 [N, 3, H, W]
batch_frames = []
for processed in processed_list:
normalized = processed.astype(np.float32) / 255.0
transposed = np.transpose(normalized, (2, 0, 1))
batch_frames.append(transposed)
batch_data = np.stack(batch_frames)
if self.fp16_mode:
batch_data = batch_data.astype(np.float16)
return batch_data, scale_infos
class ImagePreprocessor:
"""图像预处理流水线主类
整合ROI裁剪、Letterbox、Batch打包等功能
"""
def __init__(self, config: Optional[InferenceConfig] = None):
"""
初始化预处理器
Args:
config: 推理配置
"""
if config is None:
settings = get_settings()
config = settings.inference
self.config = config
self._cropper = ROICropper()
self._letterbox = LetterboxPreprocessor(
(config.input_width, config.input_height)
)
self._batch_preprocessor = BatchPreprocessor(
target_size=(config.input_width, config.input_height),
fp16_mode=config.fp16_mode
)
self._logger = get_logger("preprocessor")
self._logger.info(
f"图像预处理器初始化完成: "
f"输入尺寸 {config.input_width}x{config.input_height}, "
f"最大Batch {self._batch_preprocessor.max_batch_size}, "
f"FP16模式 {config.fp16_mode}"
)
def preprocess_single(
self,
image: np.ndarray,
roi: Optional[ROIInfo] = None
) -> Tuple[np.ndarray, Tuple[float, float, float, float]]:
"""
预处理单张图像
Args:
image: 原始图像
roi: 可选的ROI配置
Returns:
tuple: (预处理后的图像, 缩放信息)
"""
if roi is not None:
cropped = self._cropper.crop(image, roi)
if cropped is None:
cropped = image
else:
cropped = image
processed, scale_info = self._letterbox.preprocess(cropped)
return processed, scale_info
def preprocess_batch(
self,
images: List[np.ndarray],
rois: Optional[List[Optional[ROIInfo]]] = None
) -> Tuple[np.ndarray, List[Tuple[float, float, float, float]]]:
"""
预处理批次图像
Args:
images: 原始图像列表
rois: 可选的ROI配置列表
Returns:
tuple: (批次数据 [N, 3, H, W], 缩放信息列表)
"""
if rois is None:
rois = [None] * len(images)
processed_images = []
scale_info_list = []
for image, roi in zip(images, rois):
if roi is not None:
cropped = self._cropper.crop(image, roi)
if cropped is None:
cropped = image
else:
cropped = image
processed_images.append(cropped)
# BatchPreprocessor 处理 letterbox + normalize + stack
batch_data, batch_scale_infos = self._batch_preprocessor.preprocess_batch(
processed_images
)
return batch_data, batch_scale_infos
def revert_boxes(
self,
boxes: List[List[float]],
scale_info: Tuple[float, float, float, float]
) -> List[List[float]]:
"""
将检测框坐标还原到原始图像空间
Args:
boxes: Letterbox空间中的检测框
scale_info: 缩放信息
Returns:
原始空间中的检测框
"""
return [self._letterbox.revert_coordinates(box, scale_info) for box in boxes]
def get_statistics(self) -> Dict[str, Any]:
"""获取预处理器统计信息"""
return {
"config": {
"input_width": self.config.input_width,
"input_height": self.config.input_height,
"batch_size": self._batch_preprocessor.max_batch_size,
"fp16_mode": self.config.fp16_mode,
},
}
def release_resources(self):
"""释放资源"""
self._logger.info("预处理器资源已释放")