Compare commits

...

13 Commits

Author SHA1 Message Date
003c2885b9 重构: 模型类别配置动态化,适配 v2+ (5 类) 模型
- settings.py: 删除 80 类 COCO_CLASS_NAMES 硬编码,改为 MODEL_CLASS_NAMES
  + 新增 MODEL_NUM_CLASSES / MODEL_OUTPUT_CHANNELS 辅助常量
  + 新增 ALGO_INTENT_CLASSES 字典 + get_algo_target_classes() 辅助函数
  + COCO_CLASS_NAMES 保留名称向后兼容,指向 MODEL_CLASS_NAMES

- postprocessor.py: 4 处硬编码 84 → MODEL_OUTPUT_CHANNELS
  + 支持不同类别数模型切换不改代码

- algorithms.py: 4 处硬编码 target_classes 默认值 → get_algo_target_classes()
  + IllegalParkingAlgorithm / VehicleCongestionAlgorithm /
    NonMotorVehicleParkingAlgorithm / GarbageDetectionAlgorithm
  + 自动过滤当前模型不支持的类(truck/bus 等)

以后换模型只需修改 settings.py 一处 MODEL_CLASS_NAMES。
2026-04-21 14:46:53 +08:00
a891deba00 新增:垃圾检测算法 GarbageDetectionAlgorithm v1.0
Edge 端实现:
- algorithms.py 新增 GarbageDetectionAlgorithm 类
  状态机:IDLE → CONFIRMING_GARBAGE → ALARMED → CONFIRMING_CLEAR → IDLE
  默认参数:confirm_garbage_sec=60, confirm_clear_sec=60, cooldown_sec=1800
  target_classes=['garbage'], alarm_level=2(普通)
  与 IllegalParking 同构但去掉 PARKED_COUNTDOWN 阶段
- AlgorithmManager 6 处集成:
  _PARAM_TYPES、default_params、load_bind_from_redis(热更新)、
  update_algorithm_params、register_algorithm、get_algorithm_status

测试:test_garbage_algorithm.py 覆盖 8 个场景,全部通过
- 无垃圾保持 IDLE
- 持续 60s 有垃圾 → 触发告警
- 冷却期内不重复触发
- 清理后发 resolve → IDLE
- 清理期内垃圾再出现 → 回 ALARMED
- reset() 清空状态
- 多目标计数
- 非 target_class 忽略

WVP 后端/前端改动方案预留在 docs/garbage_algorithm_backend_frontend_plan.md
(后续 ROI 绑定时再实施,本次只改 Edge 端)
2026-04-17 14:57:19 +08:00
bfe6a559d2 修复: .gitignore 添加 engine/pt 模型文件保护,防止 git stash 误删 2026-04-15 11:36:43 +08:00
f5077a25a8 修复: 移除未实现的 debug_http_server 模块引用
debug_http_server.py 文件不存在导致启动报 ModuleNotFoundError,
该调试功能非必要,直接删除相关导入和方法调用。
2026-04-14 10:12:46 +08:00
9c73efe1eb 修复: 参数类型强制转换 + camelCase 防御性转换 2026-04-13 15:48:43 +08:00
bf5ddb3e7a 基础设施: 统一依赖版本 + 新增 Docker 部署配置
- requirements.txt: GPU 依赖从注释改为正式声明,统一版本
  PyTorch 2.1.2+cu121, TensorRT 8.6.1.6, ultralytics 8.3.5
  NumPy 1.24→1.26.4, OpenCV 4.8.0.74→76, 新增 onnx/Pillow 等
- Dockerfile: 基于 nvcr.io/nvidia/tensorrt:23.08-py3
  (CUDA 12.1 + cuDNN 8.9 + TRT 8.6)
- docker-compose.yml: GPU 访问、host 网络、卷挂载、日志限制
- .dockerignore: 排除模型/数据/日志等大文件

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 14:59:55 +08:00
56f39f1be7 修复: 全局参数线程安全 + copy保护 + 回调只清除受影响算法缓存 2026-04-13 10:21:19 +08:00
3266241064 适配: Edge 全局参数解析 + AlgorithmManager 三级参数合并 2026-04-09 17:04:11 +08:00
c6d8430867 新增: 非机动车违停检测算法(non_motor_vehicle_parking)+ 修复 illegal_parking 参数不一致 2026-04-09 10:34:55 +08:00
5a0265de52 修复:P0+P1 生产稳定性和性能优化(6项)
P0 稳定性修复:
- 告警去重字典添加惰性清理机制,防止长时间运行内存溢出
- Redis 连接断开时显式 close() 后再置 None,防止文件描述符泄漏
- 截图消息 ACK 移至成功路径,失败消息留在 pending list 自动重试

P1 性能优化:
- GPU NMS 添加 torch.no_grad() + 显式释放临时张量,减少显存碎片
- 截图存储改为 Redis 原始 bytes,去掉 Base64 编解码开销(兼容旧格式)
- ROI 配置查询 N+1 改为 get_all_bindings() 单次 JOIN 查询

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 14:10:27 +08:00
a9a5457583 优化:算法模块全面重构 — 提取基类、常量化阈值、性能优化
- 提取 BaseAlgorithm 基类,四个算法共享 ROI 检查、目标类过滤、告警 ID 管理
- 硬编码比率阈值提取为类常量(RATIO_ON_DUTY_CONFIRM 等)
- 滑动窗口添加 maxlen=1000 防内存溢出
- tracks 合并遍历 _scan_tracks() 减少重复遍历
- 比率/均值缓存,process() 入口计算一次
- 拥堵消散比例可配置(dissipation_ratio 参数)
- 入侵 CONFIRMING_CLEAR 逻辑拆分为独立方法
- 补齐 AlgorithmType 枚举(illegal_parking、vehicle_congestion)
- 修复 _leave_start_time None guard 防 TypeError
- 修复 AlgorithmManager 默认参数与构造函数不一致
- 热更新补充支持 illegal_parking 和 vehicle_congestion

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 14:10:27 +08:00
1fad88ae0c 调整:移除推理性能汇总日志输出 2026-03-31 16:18:51 +08:00
714361b57f 优化:推理性能日志改为周期汇总输出 2026-03-31 16:08:53 +08:00
20 changed files with 3123 additions and 334 deletions

29
.dockerignore Normal file
View File

@@ -0,0 +1,29 @@
.git
__pycache__
*.pyc
*.pyo
.idea
.vscode
.env
.env.*
!.env.example
# 模型和数据通过卷挂载
models/
data/
logs/
# 测试文件
tests/
test_*.py
pytest.ini
# 文档
docs/
*.md
!CLAUDE.md
# 临时文件
*.engine
*.onnx
*.pt

4
.gitignore vendored
View File

@@ -15,8 +15,10 @@ build/
logs/
*.log
# 模型文件(忽略中间产物
# 模型文件(二进制大文件,不入库,防止 git stash --include-untracked 误删
models/*.onnx
models/*.engine
models/*.pt
# 环境配置
.env

51
Dockerfile Normal file
View File

@@ -0,0 +1,51 @@
# ============================================================
# 基础镜像NVIDIA TensorRT 23.08
# 内含CUDA 12.1.1 | cuDNN 8.9.3 | TensorRT 8.6.1.6 | Python 3.10
# ============================================================
FROM nvcr.io/nvidia/tensorrt:23.08-py3
LABEL maintainer="AI Edge Architecture Team"
LABEL description="Edge AI Inference Service - YOLOv11n + TensorRT"
# 设置时区
ENV TZ=Asia/Shanghai
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
# 系统依赖视频解码、OpenCV 运行时)
RUN apt-get update && apt-get install -y --no-install-recommends \
ffmpeg \
libsm6 \
libxext6 \
libgl1-mesa-glx \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /app
# 先复制依赖文件,利用 Docker 层缓存
COPY requirements.txt .
# 安装 PyTorchCUDA 12.1 版本)+ 其余依赖
RUN pip install --no-cache-dir \
torch==2.1.2 torchvision==0.16.2 \
--index-url https://download.pytorch.org/whl/cu121 \
&& pip install --no-cache-dir -r requirements.txt
# 复制项目代码
COPY __init__.py .
COPY main.py .
COPY algorithms.py .
COPY build_engine.py .
COPY config/ ./config/
COPY core/ ./core/
COPY utils/ ./utils/
# 模型和数据通过卷挂载,不打入镜像
# -v /path/to/models:/app/models
# -v /path/to/data:/app/data
# 日志目录
RUN mkdir -p /app/logs /app/data
EXPOSE 9001
CMD ["python", "main.py"]

File diff suppressed because it is too large Load Diff

View File

@@ -20,6 +20,9 @@ class AlgorithmType(str, Enum):
"""算法类型枚举"""
LEAVE_POST = "leave_post"
INTRUSION = "intrusion"
ILLEGAL_PARKING = "illegal_parking"
VEHICLE_CONGESTION = "vehicle_congestion"
NON_MOTOR_VEHICLE_PARKING = "non_motor_vehicle_parking"
CROWD_DETECTION = "crowd_detection"
FACE_RECOGNITION = "face_recognition"

View File

@@ -259,6 +259,15 @@ class SQLiteManager:
except Exception:
pass # 列已存在,忽略
# 算法全局参数表
cursor.execute("""
CREATE TABLE IF NOT EXISTS algo_global_params (
algo_code TEXT PRIMARY KEY,
params TEXT NOT NULL DEFAULT '{}',
updated_at TEXT
)
""")
self._init_default_algorithms()
def _init_default_algorithms(self):
@@ -885,6 +894,37 @@ class SQLiteManager:
logger.error(f"获取摄像头算法绑定失败: {e}")
return []
def get_all_bindings(self) -> List[Dict[str, Any]]:
"""获取所有启用的算法绑定(一次查询,避免 N+1"""
try:
cursor = self._conn.cursor()
cursor.execute("""
SELECT b.bind_id, b.roi_id, b.algo_code, b.params, b.priority,
b.enabled, b.created_at, b.updated_at,
a.algo_name, a.target_class
FROM roi_algo_bind b
LEFT JOIN algorithm_registry a ON b.algo_code = a.algo_code
WHERE b.enabled = 1
ORDER BY b.priority DESC
""")
results = []
for row in cursor.fetchall():
result = dict(zip(
['bind_id', 'roi_id', 'algo_code', 'params', 'priority',
'enabled', 'created_at', 'updated_at', 'algo_name', 'target_class'],
row
))
if result.get('params') and isinstance(result['params'], str):
try:
result['params'] = json.loads(result['params'])
except (json.JSONDecodeError, TypeError):
pass
results.append(result)
return results
except Exception as e:
logger.error(f"获取所有算法绑定失败: {e}")
return []
def delete_roi_algo_bind(self, bind_id: str) -> bool:
"""删除ROI算法绑定"""
try:
@@ -917,6 +957,39 @@ class SQLiteManager:
logger.error(f"获取所有绑定ID失败: {e}")
return []
def save_global_params(self, algo_code: str, params_dict: Dict[str, Any]) -> bool:
"""保存算法全局参数INSERT OR REPLACE"""
try:
cursor = self._conn.cursor()
now = datetime.now().isoformat()
cursor.execute("""
INSERT OR REPLACE INTO algo_global_params (algo_code, params, updated_at)
VALUES (?, ?, ?)
""", (algo_code, json.dumps(params_dict, ensure_ascii=False), now))
self._conn.commit()
return True
except Exception as e:
logger.error(f"保存算法全局参数失败: {e}")
return False
def get_all_global_params(self) -> Dict[str, Dict[str, Any]]:
"""获取所有算法全局参数,返回 {algo_code: params_dict}"""
result: Dict[str, Dict[str, Any]] = {}
try:
cursor = self._conn.cursor()
cursor.execute("SELECT algo_code, params FROM algo_global_params")
for row in cursor.fetchall():
algo_code = row[0]
params_str = row[1]
try:
result[algo_code] = json.loads(params_str) if params_str else {}
except (json.JSONDecodeError, TypeError):
result[algo_code] = {}
return result
except Exception as e:
logger.error(f"获取算法全局参数失败: {e}")
return result
def log_config_update(
self,
config_type: str,

View File

@@ -143,19 +143,40 @@ class DebugConfig:
local_config_path: str = "./config/local_config.json"
# COCO 数据集类别名称YOLO 模型使用
COCO_CLASS_NAMES = [
"person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck", "boat",
"traffic light", "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat",
"dog", "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe", "backpack",
"umbrella", "handbag", "tie", "suitcase", "frisbee", "skis", "snowboard", "sports ball",
"kite", "baseball bat", "baseball glove", "skateboard", "surfboard", "tennis racket",
"bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple",
"sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake",
"chair", "couch", "potted plant", "bed", "dining table", "toilet", "tv", "laptop",
"mouse", "remote", "keyboard", "cell phone", "microwave", "oven", "toaster", "sink",
"refrigerator", "book", "clock", "vase", "scissors", "teddy bear", "hair drier", "toothbrush"
]
# 模型类别配置(支持不同模型切换,修改此处即可
# 当前: yolo11s_v2plus_20260421 (5 类定制模型)
# 历史: yolo11n 官方 (80 类 COCO),列表见 git 历史
MODEL_CLASS_NAMES = ["garbage", "person", "car", "bicycle", "motorcycle"]
MODEL_NUM_CLASSES = len(MODEL_CLASS_NAMES) # 模型类别数
MODEL_OUTPUT_CHANNELS = 4 + MODEL_NUM_CLASSES # YOLO 输出通道 = 4(xywh) + nc
# 向后兼容:保留 COCO_CLASS_NAMES 名称,指向当前模型类别
COCO_CLASS_NAMES = MODEL_CLASS_NAMES
# 各算法的业务关注类别(全集,不限定当前模型是否支持)
# 部署时通过 get_algo_target_classes() 自动过滤出当前模型支持的子集
ALGO_INTENT_CLASSES = {
"leave_post": ["person"],
"intrusion": ["person"],
"illegal_parking": ["car", "truck", "bus"],
"vehicle_congestion": ["car", "truck", "bus", "motorcycle"],
"non_motor_vehicle_parking": ["bicycle", "motorcycle"],
"garbage": ["garbage"],
}
def get_algo_target_classes(algo_code: str) -> list:
"""获取算法的目标类别,自动过滤出当前模型支持的类
Args:
algo_code: 算法代码 (leave_post / intrusion / illegal_parking / ...)
Returns:
目标类名列表,保证每个类都在 MODEL_CLASS_NAMES 中
"""
intent = ALGO_INTENT_CLASSES.get(algo_code, [])
return [c for c in intent if c in MODEL_CLASS_NAMES]
@dataclass

View File

@@ -47,6 +47,7 @@ class AlarmUploadWorker:
self._logger = logging.getLogger("alarm_upload_worker")
self._redis: Optional[redis.Redis] = None
self._redis_binary: Optional[redis.Redis] = None # 用于读取截图 bytes
self._cos_client = None # 懒初始化
self._thread: Optional[threading.Thread] = None
@@ -80,6 +81,16 @@ class AlarmUploadWorker:
)
self._redis.ping()
self._logger.info(f"Worker Redis 连接成功: {redis_cfg.host}:{redis_cfg.port}/{redis_cfg.db}")
# 二进制 Redis 连接(用于读取截图 bytes不做 decode
self._redis_binary = redis.Redis(
host=redis_cfg.host,
port=redis_cfg.port,
db=redis_cfg.db,
password=redis_cfg.password,
decode_responses=False,
socket_connect_timeout=5,
)
except Exception as e:
self._logger.error(f"Worker Redis 连接失败: {e}")
return
@@ -136,6 +147,12 @@ class AlarmUploadWorker:
except Exception:
pass
if self._redis_binary:
try:
self._redis_binary.close()
except Exception:
pass
self._logger.info("AlarmUploadWorker 已停止")
def _worker_loop(self):
@@ -184,21 +201,43 @@ class AlarmUploadWorker:
self._logger.info(f"开始处理告警: {alarm_id} (retry={retry_count})")
# Step 1: 上传截图到 COS(从 base64 解码后直接上传字节流)
# Step 1: 上传截图到 COS
snapshot_key = (alarm_data.get("ext_data") or {}).get("_snapshot_key")
snapshot_b64 = alarm_data.get("snapshot_b64")
object_key = None
if snapshot_b64:
if snapshot_key:
# 新格式:从独立 Redis key 获取原始 bytes
try:
image_bytes = self._redis_binary.get(snapshot_key) if self._redis_binary else None
if image_bytes is None:
self._logger.warning(f"截图 key 已过期: {snapshot_key}, 无截图继续上报")
else:
object_key = self._upload_snapshot_to_cos(
image_bytes, alarm_id, alarm_data.get("device_id", "unknown")
)
if object_key is None:
self._handle_retry(alarm_json, "COS 上传失败")
return
# 上传成功后删除临时 key
try:
if self._redis_binary:
self._redis_binary.delete(snapshot_key)
except Exception:
pass
except Exception as e:
self._logger.error(f"截图获取/上传失败: {e}")
self._handle_retry(alarm_json, f"截图处理失败: {e}")
return
elif snapshot_b64:
# 兼容旧格式 (Base64)
try:
import base64
image_bytes = base64.b64decode(snapshot_b64)
object_key = self._upload_snapshot_to_cos(
image_bytes,
alarm_id,
alarm_data.get("device_id", "unknown"),
image_bytes, alarm_id, alarm_data.get("device_id", "unknown")
)
if object_key is None:
# COS 上传失败,进入重试
self._handle_retry(alarm_json, "COS 上传失败")
return
except Exception as e:

View File

@@ -15,6 +15,7 @@ import json
import logging
import os
import platform
import re
import socket
# 禁用系统代理Clash 等代理工具会干扰 Redis TCP 长连接)
@@ -53,6 +54,12 @@ def _build_keepalive_options():
return opts
def _camel_to_snake(name: str) -> str:
"""将 camelCase 转换为 snake_case"""
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()
# ==================== Redis Key 常量 ====================
# 云端 Redis Keys
@@ -215,6 +222,15 @@ class ConfigSyncManager:
logger.error(f"本地 Redis 连接失败: {e}")
self._local_redis = None
def _safe_close_cloud_redis(self):
"""安全关闭云端 Redis 连接"""
if self._cloud_redis is not None:
try:
self._cloud_redis.close()
except Exception:
pass
self._cloud_redis = None
def _init_cloud_redis(self):
"""初始化云端 Redis 连接"""
try:
@@ -238,7 +254,7 @@ class ConfigSyncManager:
except Exception as e:
logger.warning(f"云端 Redis 连接失败(将使用本地缓存运行): {e}")
self._cloud_redis = None
self._safe_close_cloud_redis()
def _init_database(self):
"""初始化 SQLite 数据库连接"""
@@ -311,9 +327,7 @@ class ConfigSyncManager:
try:
cameras = self._db_manager.get_all_camera_configs()
rois = self._db_manager.get_all_roi_configs()
binds = []
for roi in rois:
binds.extend(self._db_manager.get_bindings_by_roi(roi["roi_id"]))
binds = self._db_manager.get_all_bindings()
logger.info(f"[EDGE] Loading config from local db ({source})...")
logger.info(f"[EDGE] Camera count = {len(cameras)}")
logger.info(f"[EDGE] ROI count = {len(rois)}")
@@ -378,7 +392,7 @@ class ConfigSyncManager:
if self._stop_event.is_set():
return
logger.warning(f"云端 Redis 连接断开: {e}, {backoff}s 后重连...")
self._cloud_redis = None
self._safe_close_cloud_redis()
self._stop_event.wait(backoff)
backoff = min(backoff * 2, max_backoff)
@@ -636,6 +650,21 @@ class ConfigSyncManager:
# 清理 SQLite 中不在本次推送列表中的旧数据
self._cleanup_stale_records(incoming_camera_ids, incoming_roi_ids, incoming_bind_ids)
# 同步全局参数
global_params = config_data.get("global_params") or config_data.get("globalParams") or {}
if global_params and isinstance(global_params, dict):
for algo_code, params_dict in global_params.items():
if isinstance(params_dict, dict):
# 防御性转换camelCase → snake_case
params_dict = {_camel_to_snake(k): v for k, v in params_dict.items()}
self._db_manager.save_global_params(algo_code, params_dict)
logger.info(f"全局参数同步完成: {list(global_params.keys())}")
# 通知全局参数更新回调
self._notify_callbacks("global_params_update", {
"global_params": global_params,
})
except Exception as e:
logger.error(f"配置同步到 SQLite 失败: {e}")
@@ -776,10 +805,7 @@ class ConfigSyncManager:
bindings_list = self._db_manager.get_bindings_by_camera(camera_id)
else:
roi_configs = self._db_manager.get_all_roi_configs()
bindings_list = []
for roi in roi_configs:
bindings = self._db_manager.get_bindings_by_roi(roi['roi_id'])
bindings_list.extend(bindings)
bindings_list = self._db_manager.get_all_bindings()
roi_dict = {r['roi_id']: r for r in roi_configs}
bindings_dict: Dict[str, list] = {}
@@ -857,8 +883,7 @@ class ConfigSyncManager:
binds: List[Dict[str, Any]] = []
rois = self._db_manager.get_all_roi_configs()
for roi in rois:
binds.extend(self._db_manager.get_bindings_by_roi(roi["roi_id"]))
binds = self._db_manager.get_all_bindings()
return binds
def get_algo_bind_from_redis(self, bind_id: str) -> Optional[Dict[str, Any]]:

View File

@@ -20,7 +20,7 @@ try:
except ImportError:
_HAS_TORCH = False
from config.settings import get_settings
from config.settings import get_settings, MODEL_OUTPUT_CHANNELS, MODEL_NUM_CLASSES
from config.config_models import ROIInfo, ROIType, AlertInfo, AlertLevel
from utils.logger import get_logger
from utils.common import generate_unique_id
@@ -78,22 +78,24 @@ class NMSProcessor:
max_output_size: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""GPU 加速 NMS"""
boxes_t = torch.from_numpy(boxes).cuda()
scores_t = torch.from_numpy(scores).cuda()
with torch.no_grad():
boxes_t = torch.from_numpy(boxes).cuda()
scores_t = torch.from_numpy(scores).cuda()
keep = torch_nms(boxes_t, scores_t, iou_threshold=self.nms_threshold)
keep = torch_nms(boxes_t, scores_t, iou_threshold=self.nms_threshold)
keep_np = keep.cpu().numpy()
keep_np = keep.cpu().numpy()
del boxes_t, scores_t, keep
if len(keep_np) > max_output_size:
top_k = np.argsort(scores[keep_np])[::-1][:max_output_size]
keep_np = keep_np[top_k]
if len(keep_np) > max_output_size:
top_k = np.argsort(scores[keep_np])[::-1][:max_output_size]
keep_np = keep_np[top_k]
return (
keep_np.astype(np.int32),
scores[keep_np],
class_ids[keep_np] if class_ids is not None else np.array([])
)
return (
keep_np.astype(np.int32),
scores[keep_np],
class_ids[keep_np] if class_ids is not None else np.array([])
)
def _process_cpu(
self,
@@ -665,14 +667,14 @@ class PostProcessor:
if output.ndim != 2:
return np.array([]), np.array([]), np.array([])
if output.shape[0] != 84:
if output.shape[0] != MODEL_OUTPUT_CHANNELS:
return np.array([]), np.array([]), np.array([])
boxes_xywh = output[0:4, :].T
# YOLO11 输出格式: [4+num_classes, 8400]
# 前4行是 xywh80行是各类别分数,没有单独的 objectness 行
class_scores = output[4:, :] # [num_classes, 8400]
# YOLO11 输出格式: [4+num_classes, num_anchors]
# 前4行是 xywh num_classes 行是各类别分数,没有单独的 objectness 行
class_scores = output[4:, :] # [num_classes, num_anchors]
scores = class_scores.max(axis=0) # 取各类别最大分数
class_ids = class_scores.argmax(axis=0) # 对应类别ID
@@ -732,15 +734,15 @@ class PostProcessor:
first_output = batch_outputs[0]
if isinstance(first_output, np.ndarray):
if first_output.ndim == 3 and first_output.shape[0] == batch_size:
# 已经是 (batch, 84, anchors) 格式
# 已经是 (batch, 4+nc, anchors) 格式
outputs_array = first_output
elif first_output.ndim == 1:
# TensorRT 返回扁平 1D 数组,需要 reshape 为 (batch, 84, anchors)
# TensorRT 返回扁平 1D 数组,需要 reshape 为 (batch, 4+nc, anchors)
per_image = first_output.shape[0] // batch_size
num_anchors = per_image // 84
outputs_array = first_output.reshape(batch_size, 84, num_anchors)
num_anchors = per_image // MODEL_OUTPUT_CHANNELS
outputs_array = first_output.reshape(batch_size, MODEL_OUTPUT_CHANNELS, num_anchors)
elif first_output.ndim == 2:
# (84, anchors) 单张图的输出
# (4+nc, anchors) 单张图的输出
outputs_array = first_output.reshape(1, first_output.shape[0], first_output.shape[1])
else:
outputs_array = first_output
@@ -769,7 +771,7 @@ class PostProcessor:
if output.ndim == 3:
output = output[0]
if output.shape[0] != 84:
if output.shape[0] != MODEL_OUTPUT_CHANNELS:
results.append((np.array([]), np.array([]), np.array([])))
continue

View File

@@ -112,9 +112,20 @@ class ResultReporter:
self._logger.info(
f"Redis 连接成功: {redis_cfg.host}:{redis_cfg.port}/{redis_cfg.db}"
)
# 二进制 Redis 连接(用于存储截图 bytes不做 decode
self._redis_binary = redis.Redis(
host=redis_cfg.host,
port=redis_cfg.port,
db=redis_cfg.db,
password=redis_cfg.password,
decode_responses=False,
socket_connect_timeout=5,
)
except Exception as e:
self._logger.error(f"Redis 连接失败: {e}")
self._redis = None
self._redis_binary = None
def report_alarm(self, alarm_info: AlarmInfo, screenshot: Optional[np.ndarray] = None) -> bool:
"""
@@ -129,13 +140,22 @@ class ResultReporter:
"""
self._performance_stats["alerts_generated"] += 1
# 将截图编码为 JPEG base64直接通过 Redis 传递给 Worker 上传 COS
# 将截图编码为 JPEG,直接存储 bytes 到独立 Redis key避免 Base64 开销
if screenshot is not None:
try:
import cv2
import base64
success, buffer = cv2.imencode('.jpg', screenshot, [cv2.IMWRITE_JPEG_QUALITY, 85])
if success:
if success and self._redis_binary is not None:
snapshot_key = f"local:alarm:snapshot:{alarm_info.alarm_id}"
# 直接存储 JPEG bytes避免 Base64 编解码开销
self._redis_binary.set(snapshot_key, buffer.tobytes(), ex=3600)
alarm_info.snapshot_b64 = None
if alarm_info.ext_data is None:
alarm_info.ext_data = {}
alarm_info.ext_data["_snapshot_key"] = snapshot_key
elif success:
# 降级:无二进制 Redis 连接时使用 Base64
import base64
alarm_info.snapshot_b64 = base64.b64encode(buffer.tobytes()).decode('ascii')
else:
self._logger.warning("截图 JPEG 编码失败")
@@ -211,6 +231,12 @@ class ResultReporter:
except Exception:
pass
if hasattr(self, '_redis_binary') and self._redis_binary:
try:
self._redis_binary.close()
except Exception:
pass
self._logger.info("ResultReporter 清理完成")
def cleanup(self):

View File

@@ -59,6 +59,7 @@ class ScreenshotHandler:
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._last_pending_check = 0.0
# ==================== 生命周期 ====================
@@ -180,20 +181,26 @@ class ScreenshotHandler:
backoff = 5 # 重置退避
# 每 60 秒检查一次 pending 消息
import time as _time
if _time.time() - self._last_pending_check > 60:
self._last_pending_check = _time.time()
self._cleanup_pending_messages()
for stream_name, messages in results:
for msg_id, fields in messages:
try:
self._handle_request(fields)
except Exception as e:
logger.error("[截图] 处理请求失败: %s", e)
finally:
# ACK 消息
# 处理成功才 ACK
try:
self._cloud_redis.xack(
SNAP_REQUEST_STREAM, SNAP_CONSUMER_GROUP, msg_id
)
except Exception:
pass
except Exception as e:
logger.error("[截图] 处理请求失败 (msg_id=%s): %s", msg_id, e)
# 不 ACK消息留在 pending list 等待重试
except redis.ConnectionError as e:
if self._stop_event.is_set():
@@ -409,3 +416,38 @@ class ScreenshotHandler:
logger.info("[截图] 降级写 Redis 成功: request_id=%s", request_id)
except Exception as e:
logger.error("[截图] 降级写 Redis 也失败: %s", e)
# ==================== Pending 消息清理 ====================
_MAX_RETRY_COUNT = 3
_PENDING_IDLE_MS = 30000 # 消息 pending 超过 30 秒才处理
def _cleanup_pending_messages(self):
"""清理 pending list 中重试次数过多的消息"""
try:
pending = self._cloud_redis.xpending_range(
SNAP_REQUEST_STREAM, SNAP_CONSUMER_GROUP,
min="-", max="+", count=50,
consumername=self._consumer_name
)
for entry in pending:
msg_id = entry['message_id']
delivery_count = entry['times_delivered']
idle_ms = entry['time_since_delivered']
if idle_ms < self._PENDING_IDLE_MS:
continue
if delivery_count > self._MAX_RETRY_COUNT:
logger.warning(
"[截图] 消息超过最大重试次数,丢弃: msg_id=%s, retries=%d",
msg_id, delivery_count
)
try:
self._cloud_redis.xack(
SNAP_REQUEST_STREAM, SNAP_CONSUMER_GROUP, msg_id
)
except Exception:
pass
except Exception as e:
logger.debug("[截图] 检查 pending list: %s", e)

46
docker-compose.yml Normal file
View File

@@ -0,0 +1,46 @@
version: "3.8"
services:
edge-inference:
build: .
image: edge-inference:latest
container_name: edge-inference
restart: always
# GPU 访问
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
# 环境变量
env_file:
- .env
# 卷挂载
volumes:
- ./models:/app/models # TensorRT 引擎文件
- ./data:/app/data # SQLite + 截图缓存
- ./logs:/app/logs # 运行日志
- ./.env:/app/.env # 环境配置
# 网络(需要访问摄像头 RTSP + 云端 API + Redis
network_mode: host
# 健康检查
healthcheck:
test: ["CMD", "python", "-c", "import os; assert os.path.exists('/app/main.py')"]
interval: 30s
timeout: 10s
retries: 3
start_period: 30s
# 日志限制
logging:
driver: json-file
options:
max-size: "50m"
max-file: "5"

646
docs/code_review_report.md Normal file
View File

@@ -0,0 +1,646 @@
# algorithms.py 代码审查报告
> 审查日期: 2026-04-02
> 审查文件: algorithms.py (1733行)
> 审查范围: LeavePostAlgorithm, IntrusionAlgorithm, IllegalParkingAlgorithm, VehicleCongestionAlgorithm, AlgorithmManager
---
## 1. 功能基线清单
### 1.1 LeavePostAlgorithm (离岗检测)
**状态定义 (7个):**
| 状态 | 常量 | 含义 |
|------|------|------|
| INIT | `STATE_INIT` | 初始化,等待检测到人 |
| CONFIRMING_ON_DUTY | `STATE_CONFIRMING_ON_DUTY` | 上岗确认中(需持续检测到人) |
| ON_DUTY | `STATE_ON_DUTY` | 已确认在岗 |
| CONFIRMING_OFF_DUTY | `STATE_CONFIRMING_OFF_DUTY` | 离岗确认中(持续未检测到人) |
| OFF_DUTY_COUNTDOWN | `STATE_OFF_DUTY_COUNTDOWN` | 离岗倒计时(确认离岗后等待告警) |
| ALARMED | `STATE_ALARMED` | 已告警(等待回岗) |
| NON_WORK_TIME | `STATE_NON_WORK_TIME` | 非工作时间 |
**状态转换矩阵:**
| 从 \ 到 | INIT | CONFIRMING_ON_DUTY | ON_DUTY | CONFIRMING_OFF_DUTY | OFF_DUTY_COUNTDOWN | ALARMED | NON_WORK_TIME |
|---------|------|--------------------|---------|--------------------|-------------------|---------|--------------|
| INIT | - | roi_has_person==True | - | - | - | - | not in_working_hours |
| CONFIRMING_ON_DUTY | detection_ratio==0 | - | elapsed>=confirm_on_duty_sec AND ratio>=0.6 | - | - | - | not in_working_hours |
| ON_DUTY | - | - | - | detection_ratio<0.2 | - | - | not in_working_hours |
| CONFIRMING_OFF_DUTY | - | - | detection_ratio>=0.5 | - | elapsed>=confirm_off_duty_sec AND ratio<0.2 | - | not in_working_hours |
| OFF_DUTY_COUNTDOWN | - | - | roi_has_person==True | - | - | elapsed>=leave_countdown_sec AND cooldown ok | not in_working_hours |
| ALARMED | - | roi_has_person==True | - | - | - | - | not in_working_hours |
| NON_WORK_TIME | in_working_hours | - | - | - | - | - | - |
**关键行为:**
- 使用滑动窗口10秒平滑检测结果计算 detection_ratio
- ALARMED -> CONFIRMING_ON_DUTY 复用上岗确认状态(不是独立的回岗确认状态)
- 进入 ON_DUTY 状态时,若存在 `_last_alarm_id`,自动发送 `alarm_resolve` 事件
- 进入 NON_WORK_TIME 时,若有未结束告警,发送 resolve_type="non_work_time" 的 resolve 事件
- 冷却期检查使用 `cooldown_key = f"{camera_id}_{roi_id}"`
- `_last_alarm_id` 由外部 main.py 通过 `set_last_alarm_id()` 回填
- `_leave_start_time` 在进入 OFF_DUTY_COUNTDOWN 时记录(值等于 state_start_time
**构造函数参数:**
- `confirm_on_duty_sec`: int = 10 (上岗确认窗口)
- `confirm_off_duty_sec`: int = 30 (离岗确认窗口)
- `confirm_return_sec`: int = 10 (回岗确认窗口 -- 注意: 代码中实际未使用此参数)
- `leave_countdown_sec`: int = 300 (离岗倒计时)
- `cooldown_sec`: int = 600 (告警冷却期)
- `working_hours`: Optional[List[Dict]] = None
- `target_class`: Optional[str] = "person"
- `alarm_level`: Optional[int] = None (默认2)
- `confirm_leave_sec`: Optional[int] = None (向后兼容旧参数名)
### 1.2 IntrusionAlgorithm (周界入侵)
**状态定义 (4个):**
| 状态 | 常量 | 含义 |
|------|------|------|
| IDLE | `STATE_IDLE` | 空闲,无入侵 |
| CONFIRMING_INTRUSION | `STATE_CONFIRMING_INTRUSION` | 入侵确认中 |
| ALARMED | `STATE_ALARMED` | 已告警(等待入侵消失) |
| CONFIRMING_CLEAR | `STATE_CONFIRMING_CLEAR` | 入侵消失确认中 |
**状态转换矩阵:**
| 从 \ 到 | IDLE | CONFIRMING_INTRUSION | ALARMED | CONFIRMING_CLEAR |
|---------|------|---------------------|---------|-----------------|
| IDLE | - | roi_has_person==True | - | - |
| CONFIRMING_INTRUSION | not roi_has_person OR cooldown内 OR state_start_time==None | - | elapsed>=confirm_intrusion_seconds AND cooldown ok | - |
| ALARMED | - | - | - | not roi_has_person |
| CONFIRMING_CLEAR | elapsed>=confirm_clear_seconds AND no person | - | person_elapsed>=confirm_intrusion_seconds (持续有人) OR state_start_time==None | - |
**关键行为:**
- 不使用滑动窗口,直接使用当前帧的 `roi_has_person` 判断
- CONFIRMING_CLEAR 有子状态追踪: `_person_detected_in_clear_time` 用于判断短暂有人 vs 持续有人
- 冷却期内入侵确认直接回到 IDLE不触发告警
- 包含防御性编程: state_start_time==None 时重置到 IDLE
- `_check_target_class` 允许 target_class 为 None匹配所有类别
**构造函数参数:**
- `cooldown_seconds`: int = 300
- `confirm_seconds`: int = 5 (向后兼容)
- `confirm_intrusion_seconds`: Optional[int] = None (默认使用 confirm_seconds)
- `confirm_clear_seconds`: Optional[int] = None (默认180)
- `target_class`: Optional[str] = None
- `alarm_level`: Optional[int] = None (默认1)
### 1.3 IllegalParkingAlgorithm (车辆违停)
**状态定义 (5个):**
| 状态 | 常量 | 含义 |
|------|------|------|
| IDLE | `STATE_IDLE` | 空闲 |
| CONFIRMING_VEHICLE | `STATE_CONFIRMING_VEHICLE` | 车辆确认中 |
| PARKED_COUNTDOWN | `STATE_PARKED_COUNTDOWN` | 违停倒计时 |
| ALARMED | `STATE_ALARMED` | 已告警 |
| CONFIRMING_CLEAR | `STATE_CONFIRMING_CLEAR` | 消失确认中 |
**状态转换矩阵:**
| 从 \ 到 | IDLE | CONFIRMING_VEHICLE | PARKED_COUNTDOWN | ALARMED | CONFIRMING_CLEAR |
|---------|------|-------------------|-----------------|---------|-----------------|
| IDLE | - | roi_has_vehicle | - | - | - |
| CONFIRMING_VEHICLE | ratio<0.3 OR state_start_time==None | - | elapsed>=confirm_vehicle_sec AND ratio>=0.6 | - | - |
| PARKED_COUNTDOWN | ratio<0.2 (车离开) OR cooldown内 OR state_start_time==None | - | - | elapsed>=parking_countdown_sec AND cooldown ok | - |
| ALARMED | - | - | - | - | ratio<0.15 |
| CONFIRMING_CLEAR | elapsed>=confirm_clear_sec AND ratio<0.2 OR state_start_time==None | - | - | ratio>=0.5 | - |
**关键行为:**
- 使用滑动窗口WINDOW_SIZE_SEC=10秒
- 支持多类车辆: target_classes 默认 ["car", "truck", "bus", "motorcycle"]
- 告警字段包含 `confidence``duration_minutes`
- CONFIRMING_CLEAR -> IDLE 时清除 alert_cooldowns新车违停可正常告警
- ALARMED 进入 CONFIRMING_CLEAR 的阈值(0.15)比其他算法更严格
**构造函数参数:**
- `confirm_vehicle_sec`: int = 15
- `parking_countdown_sec`: int = 300
- `confirm_clear_sec`: int = 120
- `cooldown_sec`: int = 1800
- `target_classes`: Optional[List[str]] = None (默认 ["car", "truck", "bus", "motorcycle"])
- `alarm_level`: Optional[int] = None (默认1)
### 1.4 VehicleCongestionAlgorithm (车辆拥堵)
**状态定义 (4个):**
| 状态 | 常量 | 含义 |
|------|------|------|
| NORMAL | `STATE_NORMAL` | 正常 |
| CONFIRMING_CONGESTION | `STATE_CONFIRMING_CONGESTION` | 拥堵确认中 |
| CONGESTED | `STATE_CONGESTED` | 拥堵中 |
| CONFIRMING_CLEAR | `STATE_CONFIRMING_CLEAR` | 消散确认中 |
**状态转换矩阵:**
| 从 \ 到 | NORMAL | CONFIRMING_CONGESTION | CONGESTED | CONFIRMING_CLEAR |
|---------|--------|----------------------|-----------|-----------------|
| NORMAL | - | avg_count >= count_threshold | - | - |
| CONFIRMING_CONGESTION | avg_count < count_threshold OR cooldown内 OR state_start_time==None | - | elapsed >= confirm_congestion_sec AND cooldown ok | - |
| CONGESTED | - | - | - | avg_count < count_threshold * 0.5 |
| CONFIRMING_CLEAR | elapsed >= confirm_clear_sec OR state_start_time==None | - | avg_count >= count_threshold | - |
**关键行为:**
- 使用滑动窗口WINDOW_SIZE_SEC=10秒存储车辆计数取平均值判断
- 消散需车辆数降到阈值的 **50%** 以下才开始确认(避免抖动)
- CONFIRMING_CLEAR -> NORMAL 时清除 alert_cooldowns
- 告警字段包含 `vehicle_count``confidence`
**构造函数参数:**
- `count_threshold`: int = 5
- `confirm_congestion_sec`: int = 60
- `confirm_clear_sec`: int = 180
- `cooldown_sec`: int = 1800
- `target_classes`: Optional[List[str]] = None (默认 ["car", "truck", "bus", "motorcycle"])
- `alarm_level`: Optional[int] = None (默认2)
### 1.5 AlgorithmManager
**数据结构:**
```
self.algorithms: Dict[str, Dict[str, Dict[str, Algorithm]]]
结构: { roi_id: { "{roi_id}_{bind_id}": { algo_type: algo_instance } } }
```
**公开方法:**
- `start_config_subscription()` - 启动 Redis 配置订阅
- `stop_config_subscription()` - 停止配置订阅
- `load_bind_from_redis(bind_id)` - 从 Redis 加载单个绑定配置
- `reload_bind_algorithm(bind_id)` - 重载单个绑定
- `reload_algorithm(roi_id)` - 重载单个 ROI 的所有算法
- `update_algorithm_params(roi_id, bind_id, bind_config)` - 仅更新参数,保留状态
- `reload_all_algorithms(preserve_state=True)` - 重载全部算法
- `register_algorithm(roi_id, bind_id, algorithm_type, params)` - 注册算法(带缓存)
- `process(roi_id, bind_id, camera_id, algorithm_type, tracks, current_time)` - 处理检测结果
- `update_roi_params(roi_id, bind_id, algorithm_type, params)` - 更新参数
- `reset_algorithm(roi_id, bind_id=None)` - 重置算法状态
- `reset_all()` - 重置所有算法
- `remove_roi(roi_id)` - 移除 ROI
- `remove_bind(roi_id, bind_id)` - 移除绑定
- `get_status(roi_id)` - 获取状态
---
## 2. 接口契约清单
### 2.1 process() 方法统一签名
所有四个算法的 `process()` 方法具有相同签名:
```python
def process(
self,
roi_id: str,
camera_id: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]
```
**tracks 输入格式 (每个元素):**
```python
{
"track_id": str, # 跟踪ID
"class": str, # 检测类别 ("person", "car", "truck", ...)
"confidence": float, # 置信度
"bbox": List[float], # 边界框 [x1, y1, x2, y2]
"matched_rois": [ # 匹配的ROI列表
{"roi_id": str}
],
}
```
### 2.2 告警输出格式
#### LeavePostAlgorithm 告警:
```python
{
"track_id": str, # 等于 roi_id
"camera_id": str,
"bbox": List[float], # 可能为空 []
"alert_type": "leave_post",
"alarm_level": int, # 默认 2
"message": "人员离岗告警",
"first_frame_time": str, # 格式: '%Y-%m-%d %H:%M:%S'
}
```
注意: leave_post 告警使用 `track_id` 而非 `roi_id` 字段名(与其他算法不同)。
#### IntrusionAlgorithm 告警:
```python
{
"roi_id": str,
"camera_id": str,
"bbox": List[float],
"alert_type": "intrusion",
"alarm_level": int, # 默认 1
"message": "检测到周界入侵",
"first_frame_time": str, # 格式: '%Y-%m-%d %H:%M:%S'
}
```
#### IllegalParkingAlgorithm 告警:
```python
{
"roi_id": str,
"camera_id": str,
"bbox": List[float],
"alert_type": "illegal_parking",
"alarm_level": int, # 默认 1
"confidence": float,
"message": str, # 动态生成,包含停留分钟数
"first_frame_time": str, # 格式: '%Y-%m-%d %H:%M:%S',可能为 None
"duration_minutes": float,
}
```
#### VehicleCongestionAlgorithm 告警:
```python
{
"roi_id": str,
"camera_id": str,
"bbox": List[float],
"alert_type": "vehicle_congestion",
"alarm_level": int, # 默认 2
"confidence": float,
"message": str, # 动态生成,包含平均车辆数和持续秒数
"first_frame_time": str, # 格式: '%Y-%m-%d %H:%M:%S',可能为 None
"vehicle_count": int,
}
```
#### alarm_resolve 事件 (所有算法统一格式):
```python
{
"alert_type": "alarm_resolve",
"resolve_alarm_id": str,
"duration_ms": int,
"last_frame_time": str, # 格式: '%Y-%m-%d %H:%M:%S'
"resolve_type": str, # "person_returned" | "non_work_time" | "intrusion_cleared" | "vehicle_left" | "congestion_cleared"
}
```
### 2.3 AlgorithmManager.process() 签名
```python
def process(
self,
roi_id: str,
bind_id: str,
camera_id: str,
algorithm_type: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]
```
### 2.4 AlgorithmManager.register_algorithm() 签名
```python
def register_algorithm(
self,
roi_id: str,
bind_id: str,
algorithm_type: str, # "leave_post" | "intrusion" | "illegal_parking" | "vehicle_congestion"
params: Optional[Dict[str, Any]] = None,
)
```
---
## 3. 已发现的潜在问题
### 3.1 Critical (必须修复)
**[C1] LeavePostAlgorithm: `confirm_return_sec` 参数声明但从未使用**
- 位置: 第53行声明但状态机中 ALARMED -> CONFIRMING_ON_DUTY -> ON_DUTY 的转换直接复用 `confirm_on_duty_sec`
- 影响: 使用者设置 `confirm_return_sec` 以为可以独立控制回岗确认时长,但实际无效
- 建议: 文档中声明此参数复用 `confirm_on_duty_sec`,或实现独立的回岗确认逻辑
**[C2] LeavePostAlgorithm: resolve 事件的 duration_ms 计算依赖 `_leave_start_time`,但该值可能为 None**
- 位置: 第325行 `duration_ms = int((current_time - self._leave_start_time).total_seconds() * 1000)`
- 场景: 如果算法在 OFF_DUTY_COUNTDOWN 之前(即 _leave_start_time 赋值之前)因某种异常跳到 ON_DUTY 且 _last_alarm_id 非空,会抛出 TypeError
- 风险: 低概率但会导致该帧整个 process 调用抛异常
**[C3] LeavePostAlgorithm: `_leave_start_time` 赋值时机问题**
- 位置: 第277行 `self._leave_start_time = self.state_start_time`
- `state_start_time` 此时是 CONFIRMING_OFF_DUTY 的开始时间(非 OFF_DUTY_COUNTDOWN 的开始时间,因为 state_start_time 在下一行第276行才被更新为 current_time
- 实际效果: `_leave_start_time` 记录的是**离岗确认开始时间**,不是**倒计时开始时间**
- 审查结论: 这是有意设计,离开时间应该从人离开被确认开始计算,但代码注释"记录离开时间"可能造成误解
### 3.2 Important (应该修复)
**[I1] LeavePostAlgorithm 告警字典使用 `track_id` 而非 `roi_id`**
- 位置: 第299行 `"track_id": roi_id`
- 其他三个算法统一使用 `"roi_id": roi_id`
- 影响: main.py 中的 `_handle_detections` 不直接使用此字段(它有自己的 roi_id所以不影响功能但接口不一致
**[I2] AlgorithmManager 缺乏线程安全**
- `process()` 方法未加锁第1637-1651行`register_algorithm()` 也未加锁
- `_update_lock` 仅在 `load_bind_from_redis``reload_all_algorithms` 中使用
- 风险: 如果 config_update_worker 线程调用 `reload_all_algorithms` 同时主线程调用 `process`,可能读到不一致的 `self.algorithms` 字典
- 缓解: Python GIL 在字典读操作上提供了一定程度的原子性保护,实际崩溃概率很低
**[I3] AlgorithmManager.default_params 中 illegal_parking 的 confirm_clear_sec 默认值(30) 与 IllegalParkingAlgorithm 构造函数默认值(120) 不一致**
- 位置: 第1233行 vs 第751行
- 影响: 通过 AlgorithmManager 创建的 illegal_parking 算法 confirm_clear_sec 为 30直接创建为 120
**[I4] AlgorithmManager.default_params 中 vehicle_congestion 的 count_threshold 默认值(3) 与 VehicleCongestionAlgorithm 构造函数默认值(5) 不一致**
- 位置: 第1237行 vs 第1004行
- 影响: 通过 AlgorithmManager 创建的算法阈值为 3直接创建为 5
**[I5] `update_algorithm_params` 仅支持 leave_post 和 intrusion**
- 位置: 第1461-1496行
- 缺少 illegal_parking 和 vehicle_congestion 的参数更新逻辑第1494行注释 "其他算法类型可以在此添加"
- 影响: `reload_all_algorithms(preserve_state=True)` 对 illegal_parking/vehicle_congestion 会回退到 `load_bind_from_redis`,会重置算法状态
**[I6] `load_bind_from_redis` 仅支持 leave_post 和 intrusion**
- 位置: 第1322-1403行
- 缺少 illegal_parking 和 vehicle_congestion 的 Redis 加载逻辑
- 影响: 从 Redis 热更新配置时,这两种算法无法被加载
**[I7] IntrusionAlgorithm: `_get_latest_bbox` 不检查 target_class**
- 位置: 第456-460行
- 与 LeavePostAlgorithm 不同第139-142行会检查 target_class
- 影响: 可能返回非目标类别的 bbox
**[I8] `_is_in_working_hours` 不支持跨午夜时间段**
- 位置: 第112行 `if start_minutes <= current_minutes < end_minutes`
- 如果 working_hours 配置为 `{"start": "22:00", "end": "06:00"}`,则无法正确判断
- 影响: 夜班场景可能不工作
### 3.3 Suggestions (建议改进)
**[S1] 滑动窗口的 window_size_sec 硬编码为 10 秒**
- LeavePostAlgorithm 第80行: `self.window_size_sec = 10`
- IllegalParkingAlgorithm 第744行: `WINDOW_SIZE_SEC = 10`
- VehicleCongestionAlgorithm 第1000行: `WINDOW_SIZE_SEC = 10`
- 建议: 提取为可配置参数
**[S2] LeavePostAlgorithm._update_detection_window 与 IllegalParkingAlgorithm._update_window 实现逻辑相同但代码重复**
- 可提取为基类方法或工具函数
**[S3] LeavePostAlgorithm.get_state() 使用 datetime.now() 而非参数传入的 current_time**
- 位置: 第367-372行
- 在测试场景中会导致状态信息不准确(不影响核心逻辑,仅影响监控展示)
**[S4] AlgorithmManager.get_status() 中 leave_post 分支访问 `alarm_sent` 属性**
- 位置: 第1724行
- LeavePostAlgorithm 实际上没有 `alarm_sent` 属性getattr 返回 False
- 这是旧版本残留代码
**[S5] AlgorithmManager.remove_roi() 中 bind_id 解析逻辑脆弱**
- 位置: 第1703行 `key.split("_")[-1]`
- 如果 bind_id 本身包含下划线,解析会出错
- key 格式为 `"{roi_id}_{bind_id}"`,应该用 `key[len(roi_id)+1:]` 提取 bind_id
**[S6] `_is_in_working_hours` 中的 bare `except:` (第99行, 第124行)**
- 应该至少 except Exception 或更具体的异常类型
---
## 4. 测试覆盖分析
### 4.1 test_leave_post_full_workflow.py 覆盖的场景
| 场景 | 状态路径 | 覆盖 |
|------|----------|------|
| 上岗确认成功 | INIT -> CONFIRMING_ON_DUTY -> ON_DUTY | YES |
| 离岗确认 | ON_DUTY -> CONFIRMING_OFF_DUTY -> OFF_DUTY_COUNTDOWN | YES |
| 倒计时触发告警 | OFF_DUTY_COUNTDOWN -> ALARMED | YES |
| 回岗 resolve | ALARMED -> CONFIRMING_ON_DUTY -> ON_DUTY (+ resolve) | YES |
| 告警字段验证 | 无 duration_minutes, 有 first_frame_time | YES |
| resolve 字段验证 | duration_ms, resolve_alarm_id, resolve_type | YES |
| set_last_alarm_id 回填 | - | YES |
### 4.2 test_vehicle_algorithms.py 覆盖的场景
**IllegalParkingAlgorithm:**
| 场景 | 覆盖 |
|------|------|
| 完整生命周期 IDLE->CONFIRMING->COUNTDOWN->ALARMED->CLEAR->IDLE | YES |
| 车辆短暂路过不触发 | YES |
| 多类车辆检测 (truck, bus) | YES |
| person 不触发违停 | YES |
| 冷却期内不重复告警 | YES |
| resolve 事件发送 | YES |
**VehicleCongestionAlgorithm:**
| 场景 | 覆盖 |
|------|------|
| 完整生命周期 NORMAL->CONFIRMING->CONGESTED->CLEAR->NORMAL | YES |
| 少于阈值不触发 | YES |
| 短暂拥堵不触发 | YES |
| resolve 事件发送 | YES |
**AlgorithmManager:**
| 场景 | 覆盖 |
|------|------|
| 注册 illegal_parking | YES |
| 注册 vehicle_congestion | YES |
| process 调用 | YES |
| get_status 调用 | YES |
| 重复注册走缓存 | YES |
| reset_algorithm | YES |
### 4.3 测试覆盖缺口
**LeavePostAlgorithm 未覆盖:**
- [ ] CONFIRMING_ON_DUTY -> INIT (人消失)
- [ ] CONFIRMING_OFF_DUTY -> ON_DUTY (人回来ratio>=0.5)
- [ ] OFF_DUTY_COUNTDOWN -> ON_DUTY (倒计时期间回来)
- [ ] 非工作时间自动 resolve
- [ ] NON_WORK_TIME -> INIT (工作时间恢复)
- [ ] 冷却期内不重复告警
- [ ] 空 tracks 输入
- [ ] working_hours 配置解析(字符串格式)
**IntrusionAlgorithm 完全未测试:**
- [ ] 完整生命周期 IDLE->CONFIRMING->ALARMED->CLEAR->IDLE
- [ ] 入侵确认中人消失
- [ ] CONFIRMING_CLEAR 中短暂有人 vs 持续有人
- [ ] 冷却期
- [ ] resolve 事件
- [ ] target_class=None 匹配所有类别
- [ ] state_start_time==None 的防御性代码分支
**IllegalParkingAlgorithm 未覆盖:**
- [ ] state_start_time==None 的防御性代码分支 (CONFIRMING_VEHICLE, PARKED_COUNTDOWN, CONFIRMING_CLEAR)
- [ ] CONFIRMING_CLEAR -> ALARMED (车辆又出现, ratio>=0.5)
- [ ] ALARMED 状态下 ratio 在 0.15-0.5 之间(维持 ALARMED
**VehicleCongestionAlgorithm 未覆盖:**
- [ ] state_start_time==None 的防御性代码分支
- [ ] CONFIRMING_CLEAR -> CONGESTED (又拥堵了)
- [ ] 消散阈值 0.5*count_threshold 的边界值
- [ ] 冷却期测试
**AlgorithmManager 未覆盖:**
- [ ] start_config_subscription / stop_config_subscription
- [ ] load_bind_from_redis
- [ ] reload_all_algorithms (含孤立实例清理)
- [ ] update_algorithm_params
- [ ] remove_roi / remove_bind
- [ ] 并发调用安全性
- [ ] register_algorithm 的 leave_post 和 intrusion 类型
---
## 5. 优化安全边界
### 5.1 不可修改区域 (功能合约)
以下代码是外部依赖的契约,修改会破坏 main.py 或其他模块:
1. **所有算法的 `process()` 方法签名** -- main.py 的 `_handle_detections` 直接调用
2. **告警字典的字段名和类型** -- main.py 的 `_handle_detections` 依赖 `alert_type`, `alarm_level`, `confidence`, `bbox`, `message`, `first_frame_time`, `duration_minutes`, `vehicle_count`
3. **`alarm_resolve` 事件格式** -- main.py 的 resolve 逻辑依赖 `resolve_alarm_id`, `duration_ms`, `last_frame_time`, `resolve_type`
4. **`set_last_alarm_id(alarm_id)` 方法** -- main.py 回填 alarm_id
5. **`reset()` 方法** -- AlgorithmManager 调用
6. **`get_state()` 方法** -- AlgorithmManager.get_status() 调用
7. **AlgorithmManager.process() 签名和返回值** -- main.py 直接调用
8. **AlgorithmManager.register_algorithm() 签名** -- main.py 直接调用
9. **AlgorithmManager.algorithms 的三层字典结构** -- main.py 直接访问内部实例来获取 `_leave_start_time` 等属性 (第905-911行)
### 5.2 可安全优化区域
以下代码修改不会影响外部行为:
1. **滑动窗口实现** -- `_update_detection_window`, `_update_window`, `_update_count_window` 的内部实现可以优化,只要 `_get_detection_ratio()`, `_get_window_ratio()`, `_get_avg_count()` 的语义不变
2. **`_check_detection_in_roi` / `_check_target_class` / `_check_target_classes`** -- 内部实现可优化,接口不变即可
3. **`_get_latest_bbox` / `_get_max_confidence`** -- 辅助方法,内部实现可优化
4. **`_is_in_working_hours` / `_parse_time_to_minutes`** -- 内部实现可优化(建议修复跨午夜问题)
5. **AlgorithmManager 的 Redis 相关方法** -- `load_bind_from_redis`, `reload_*`, `_config_update_worker` 可以修改,不影响算法核心逻辑
6. **日志输出** -- 所有 logger.* 调用可以调整
7. **`default_params` 字典** -- 可以修正默认值不一致的问题
### 5.3 高风险修改区域 (需要完整回归测试)
1. **状态转换条件 (ratio 阈值)** -- 任何 detection_ratio, window_ratio 的阈值变更都可能影响告警灵敏度
- LeavePostAlgorithm: 0.6 (上岗), 0.2 (离岗开始), 0.5 (离岗恢复), 0.2 (离岗确认)
- IllegalParkingAlgorithm: 0.3 (放弃确认), 0.6 (确认有车), 0.2 (车离开), 0.15 (开始消失确认), 0.5 (车又来), 0.2 (消失确认)
- VehicleCongestionAlgorithm: count_threshold (开始确认), 0.5*count_threshold (开始消散)
2. **时间比较逻辑** -- `elapsed >= xxx_sec` 的方向(大于等于 vs 大于)
3. **冷却期检查** -- `cooldown_key` 的构造方式和比较逻辑
4. **resolve 事件触发逻辑** -- LeavePostAlgorithm 的 "进入 ON_DUTY 且 _last_alarm_id 存在" 的检查
### 5.4 重复代码可提取区域
以下方法在多个算法中重复实现,可以提取为基类或 mixin
| 方法 | 出现在 | 可提取 |
|------|--------|--------|
| `_check_detection_in_roi` | 全部4个 | YES |
| `_check_target_class` | LeavePost, Intrusion | YES |
| `_check_target_classes` | IllegalParking, VehicleCongestion | YES |
| `_get_latest_bbox` | 全部4个 | YES (注意 Intrusion 不检查 target_class) |
| `_get_max_confidence` | IllegalParking, VehicleCongestion | YES |
| `set_last_alarm_id` | 全部4个 | YES |
| 滑动窗口逻辑 | LeavePost, IllegalParking, VehicleCongestion | YES |
---
## 6. config_models.py 与 algorithms.py 的一致性
### 6.1 AlgorithmType 枚举缺失
`config_models.py` 中的 `AlgorithmType` 枚举:
```python
LEAVE_POST = "leave_post"
INTRUSION = "intrusion"
CROWD_DETECTION = "crowd_detection" # 已在 algorithms.py 中注释掉
FACE_RECOGNITION = "face_recognition" # algorithms.py 中不存在
```
缺失:
- `ILLEGAL_PARKING = "illegal_parking"` -- algorithms.py 已实现但枚举未添加
- `VEHICLE_CONGESTION = "vehicle_congestion"` -- algorithms.py 已实现但枚举未添加
### 6.2 ROIInfo 默认值
`config_models.py``ROIInfo.confirm_leave_sec` 默认值为 **10**,而 `AlgorithmManager.default_params["leave_post"]["confirm_leave_sec"]`**30**`LeavePostAlgorithm.confirm_off_duty_sec` 默认为 **30**
---
## 7. main.py 集成要点
### 7.1 main.py 对 algorithms.py 的依赖
1. **直接访问算法内部属性** (第905-911行):
```python
for attr in ('_leave_start_time', '_parking_start_time', '_congestion_start_time', '_intrusion_start_time'):
val = getattr(algo, attr, None)
```
这是紧耦合,如果内部属性名变更会导致 first_frame_time 丢失。
2. **alarm_id 回填** (第943-945行):
```python
algo.set_last_alarm_id(alarm_info.alarm_id)
```
3. **两层去重机制**:
- ROI级别: `_active_alarms[f"{roi_id}_{alert_type}"]`
- 摄像头级别: `_camera_alert_cooldown[f"{camera_id}_{alert_type}"]` (30秒冷却)
4. **duration_ms 在 ext_data 中的计算** (第925行):
```python
"duration_ms": int(alert.get("duration_minutes", 0) * 60 * 1000) if alert.get("duration_minutes") else None,
```
仅 IllegalParkingAlgorithm 的告警包含 `duration_minutes`,其他算法的 ext_data.duration_ms 为 None。
---
## 附录: 状态机可视化
### LeavePostAlgorithm
```
+--> NON_WORK_TIME --+
| (any state) | (in_working_hours)
| v
INIT --+--> CONFIRMING_ON_DUTY ---> ON_DUTY ---> CONFIRMING_OFF_DUTY
^ | ^ |
+----------+ | v
(ratio==0) | OFF_DUTY_COUNTDOWN
| | |
| (roi_has_person) | v
| +---------------+ ALARMED
| | |
+-------+ (roi_has_person) |
ON_DUTY <-- (if _last_alarm_id, send resolve)
```
### IntrusionAlgorithm
```
IDLE ---> CONFIRMING_INTRUSION ---> ALARMED ---> CONFIRMING_CLEAR ---> IDLE
^ | | |
+------------+ +--------+
(person gone) (person back >= confirm_intrusion_sec)
-> ALARMED
```
### IllegalParkingAlgorithm
```
IDLE --> CONFIRMING_VEHICLE --> PARKED_COUNTDOWN --> ALARMED --> CONFIRMING_CLEAR --> IDLE
^ | | |
+----------+ | v
(ratio<0.3) (ratio<0.2) | ALARMED
^ | | (ratio>=0.5)
+--------------------------+ v
ALARMED
```
### VehicleCongestionAlgorithm
```
NORMAL --> CONFIRMING_CONGESTION --> CONGESTED --> CONFIRMING_CLEAR --> NORMAL
^ | |
+--------------+ v
(avg<threshold) CONGESTED
(avg>=threshold)
```

View File

@@ -0,0 +1,194 @@
# 垃圾检测算法 - WVP 后端 / 前端改动方案(未实施,预留参考)
## 背景
Edge 端的 `GarbageDetectionAlgorithm` 已实现commit xxx。本文档列出 WVP 后端和前端需要做的配套改动,等后续需要在 ROI 编辑器创建垃圾检测绑定时再实施。
---
## 一、WVP 后端改动
### 1.1 算法注册(数据库)
**文件:** `wvp-platform/数据库/版本号/SQL 脚本`
**新增算法记录:**
```sql
INSERT INTO wvp_ai_algorithm (
algo_code, algo_name, algo_description,
is_active, param_schema, global_params
) VALUES (
'garbage',
'垃圾检测',
'检测监控区域内散落垃圾的持续存在,清洁后自动解除告警',
1,
'{"confirm_garbage_sec": {"type": "int", "default": 60, "min": 10, "max": 600, "label": "垃圾确认时间(秒)"},
"confirm_clear_sec": {"type": "int", "default": 60, "min": 10, "max": 600, "label": "清理确认时间(秒)"},
"cooldown_sec": {"type": "int", "default": 1800, "min": 300, "max": 7200, "label": "告警冷却时间(秒)"},
"alarm_level": {"type": "int", "default": 2, "min": 0, "max": 3, "label": "告警等级"}
}',
'{}'
);
```
### 1.2 Java 算法服务
**文件:** `wvp-platform/src/main/java/com/genersoft/iot/vmp/aiot/service/impl/AiAlgorithmServiceImpl.java`
**改动:** 算法代码白名单(约 line 42-54添加 `"garbage"`
```java
private static final Set<String> SUPPORTED_ALGO_CODES = Set.of(
"leave_post", "intrusion", "illegal_parking",
"vehicle_congestion", "non_motor_vehicle_parking",
"garbage" // 新增
);
```
### 1.3 配置下发
不需要改动。现有 `AiRedisConfigServiceImpl``global_params` 机制已通用。
---
## 二、iot-device-management-service 改动
**文件:** `app/constants.py`
检查 `AlarmType` 枚举是否已有 `GARBAGE`
- 已有 → 无需改动
- 未有 → 添加:
```python
class AlarmType(str, Enum):
LEAVE_POST = "leave_post"
INTRUSION = "intrusion"
ILLEGAL_PARKING = "illegal_parking"
VEHICLE_CONGESTION = "vehicle_congestion"
NON_MOTOR_VEHICLE_PARKING = "non_motor_vehicle_parking"
GARBAGE = "garbage" # 新增
ALARM_TYPE_NAMES: Dict[str, str] = {
...
AlarmType.GARBAGE: "垃圾检测",
}
```
**文件:** `app/services/vlm_service.py`VLM 复核提示词)
添加 garbage 的提示词模板:
```python
"garbage": """你是安防监控AI复核员。算法类型垃圾检测监控区域{roi_name}
截图显示时间:{timestamp}
任务:判断图中是否真的存在散落的垃圾、包装袋、废弃物等需要清理的物品。
分析要点:
1. 是否存在明显的垃圾(垃圾袋、纸屑、瓶罐、食品包装等)
2. 区分垃圾与正常物品(整齐放置的物品、装饰品不算垃圾)
3. 垃圾是否在通道/地面等不该出现的位置
4. 排除阴影、污渍、地砖花纹等误检
仅输出JSON{{"confirmed":true,"description":"..."}}""",
```
---
## 三、前端改动iot-device-management-frontend
### 3.1 告警列表类型筛选
**文件:** `apps/web-antd/src/views/aiot/alarm/list/data.ts`
```typescript
export const ALERT_TYPE_OPTIONS = [
{ label: '人员离岗', value: 'leave_post' },
{ label: '周界入侵', value: 'intrusion' },
{ label: '车辆违停', value: 'illegal_parking' },
{ label: '车辆拥堵', value: 'vehicle_congestion' },
{ label: '非机动车违停', value: 'non_motor_vehicle_parking' },
{ label: '垃圾检测', value: 'garbage' }, // 新增
];
```
### 3.2 告警标签颜色
**文件:** `apps/web-antd/src/views/aiot/alarm/list/index.vue`
```typescript
const colorMap: Record<string, string> = {
leave_post: 'orange',
intrusion: 'red',
illegal_parking: 'blue',
vehicle_congestion: 'geekblue',
non_motor_vehicle_parking: 'green',
garbage: 'gold', // 新增 — 建议金色表达清洁主题
};
```
### 3.3 ROI 编辑器参数配置
**文件:** `apps/web-antd/src/views/aiot/device/roi/components/AlgorithmParamEditor.vue`
参数字段已通用(读自 `algo.paramSchema`),无需改动。**但需要添加参数名中文映射:**
**文件:** `AlgorithmParamEditor.vue``paramNameMap``paramDescMap`
```typescript
const paramNameMap: Record<string, string> = {
// ... 已有
confirm_garbage_sec: '垃圾确认时间(秒)',
};
const paramDescMap: Record<string, string> = {
// ... 已有
confirm_garbage_sec: '持续检测到垃圾的时间,超过该时间触发告警(建议 60-120 秒)',
};
```
### 3.4 全局参数配置页
**文件:** `apps/web-antd/src/views/aiot/device/algorithm/index.vue`
`paramNameMap``paramDescMap` 同样需要添加 `confirm_garbage_sec` 条目(参见 3.3)。
---
## 四、验证顺序(将来实施时)
1. **后端数据库注册算法记录**
2. **WVP 后端重启** — 白名单生效
3. **Service 端** constants.py 添加(如需要)
4. **前端重启** — 下拉选项和颜色生效
5. **ROI 编辑器创建一个 garbage 绑定,参数用默认值**
6. **前端触发配置推送** — 验证 Edge 端收到并注册算法
7. **Edge 日志验证:** 应看到 `已从Redis加载垃圾检测算法: roi_xxx_bind_xxx`
8. **模拟测试:** 放个垃圾在摄像头前60 秒后应触发告警
9. **清理测试:** 移除垃圾 30 秒后应收到 resolve 事件
10. **企微卡片收到告警 + 创建工单全流程**
---
## 五、TensorRT 引擎部署(最后一步)
当确定用微调模型替换 COCO 预训练模型时:
1. **导出 engine**
```bash
yolo export model=yolo11s_v1_20260417.pt format=engine imgsz=480 half=True device=0
```
2. **替换 Edge 端模型:**
```bash
cp yolo11s_v1_20260417.engine /opt/edge/models/yolo11n.engine # 注意文件名
```
3. **修改 `config/settings.py` 的 COCO_CLASS_NAMES**
```python
COCO_CLASS_NAMES = ['garbage', 'person', 'car', 'bicycle', 'motorcycle']
```
4. **修改 `core/postprocessor.py` 的输出解析:**
- YOLO 输出从 `[84, 8400]`4+80类变为 `[9, 8400]`4+5类
- 类别分数范围从 `output[4:84]` 改为 `output[4:9]`
5. **重启 Edge 服务**
这一步涉及模型 + 推理管线,需要单独在生产环境测试。

372
docs/p0p1_review_report.md Normal file
View File

@@ -0,0 +1,372 @@
# P0+P1 修复涉及文件全面审查报告
> 审查日期: 2026-04-02
> 审查范围: main.py, config_sync.py, screenshot_handler.py, tensorrt_engine.py, result_reporter.py, alarm_upload_worker.py, postprocessor.py 及相关依赖
---
## 1. 功能基线清单
### 1.1 main.py - EdgeInferenceService
| 方法 | 行号 | 行为描述 |
|------|------|----------|
| `__init__` | 42-96 | 初始化两个去重字典: `_camera_alert_cooldown` (摄像头级, Dict[str, datetime]) 和 `_active_alarms` (ROI级, Dict[str, str])。冷却期默认30秒。 |
| `_handle_detections` | 790-954 | 核心告警处理入口。接收检测结果后: (1) 调用算法管理器获取alerts; (2) 对 `alarm_resolve` 类型从 `_active_alarms` 中清除对应记录; (3) ROI级去重: 检查 `_active_alarms[f"{roi_id}_{alert_type}"]` 是否存在; (4) 摄像头级去重: 检查 `_camera_alert_cooldown[f"{camera_id}_{alert_type}"]` 时间间隔; (5) 构建 AlarmInfo 并调用 `report_alarm()`; (6) 写入 `_active_alarms` 并回填 alarm_id 到算法实例。 |
| `_batch_process_rois` | 676-751 | 从队列取出 ROI 任务, 按 max_batch_size=8 分块, 调用 TensorRT 推理, 后处理后逐个调用 `_handle_detections`。 |
| `_process_frame` | 613-651 | 获取 ROI 配置(含绑定), 预处理裁剪, 组装 roi_items 推入队列。 |
| `_scheduler_worker` | 653-674 | 中心调度线程, 轮询所有视频流取最新帧, 丢弃超龄帧(>0.5s), 调用 `_process_frame`。 |
| `_inference_worker` | 956-976 | 推理线程, 攒批窗口50ms, 调用 `_batch_process_rois`。 |
**`_camera_alert_cooldown` 读写位置:**
- 写入: 第900行 (`self._camera_alert_cooldown[dedup_key] = now`)
- 读取: 第890行 (`self._camera_alert_cooldown.get(dedup_key)`)
- 无其他模块引用此字典
**`_active_alarms` 读写位置:**
- 写入: 第940行 (`self._active_alarms[active_key] = alarm_info.alarm_id`)
- 删除: 第865-868行 (resolve 事件清除)
- 读取: 第880行 (`active_key in self._active_alarms`)
- 无其他模块引用此字典
### 1.2 config/config_sync.py - ConfigSyncManager
| 方法 | 行为描述 |
|------|----------|
| `_init_cloud_redis` (219-241) | 创建云端Redis连接, 参数: socket_connect_timeout=10, socket_timeout=10, retry_on_timeout=True, socket_keepalive=True + TCP keepalive选项, health_check_interval=15。连接失败时将 `_cloud_redis` 设为 None 但不抛异常。 |
| `_listen_config_stream` (326-390) | Stream 监听主循环。外层while循环: 若 `_cloud_redis` 为None则调用 `_init_cloud_redis()` 重连。内层while循环: XREAD BLOCK 5000ms, 无消息时 PING 保活。`redis.ConnectionError` except 块: 将 `_cloud_redis = None` (第381行)。通用 `Exception` except 块: 不置 None, 仅等待重试。 |
| `get_roi_configs_with_bindings` (760-806) | 当传入 camera_id 时走优化路径 `get_bindings_by_camera(camera_id)` (单SQL JOIN查询)。当无 camera_id 时存在 N+1 问题: 先 `get_all_roi_configs()` 再逐个 `get_bindings_by_roi(roi_id)`。 |
**`self._cloud_redis = None` 出现位置:**
- 第381行: `_listen_config_stream``redis.ConnectionError` except 块
- 第241行: `_init_cloud_redis` 中初始化失败
### 1.3 core/screenshot_handler.py - ScreenshotHandler
| 方法 | 行为描述 |
|------|----------|
| `_listen_loop` (159-220) | XREADGROUP 消费截图请求。finally 块中(第191-196行) ACK 消息, ACK 失败时静默 pass -- 意味着消息可能被重复消费。无消息时 PING 保活。ConnectionError 时重连(带指数退避)。当 `_cloud_redis is None` 时也会主动重连。 |
| `_handle_request` (224-280) | 流程: 校验必填字段 -> 设备隔离检查 -> 抓帧 -> 上传COS(失败重试1次) -> HTTP回调(失败降级写Redis)。 |
| `_capture_frame` (284-300) | 优先从 MultiStreamManager 获取已有流帧, 无流时降级临时 RTSP 连接抓帧。 |
| `_send_result` (375-402) | 优先 HTTP 回调, 失败降级写 Redis key `snap:result:{request_id}` (TTL 60s)。 |
**补偿机制:** 除 COS 上传有1次重试、HTTP 回调有 Redis 降级外, 无其他补偿。xack 失败意味着 Redis 会再次投递该消息(at-least-once 语义)。
### 1.4 core/tensorrt_engine.py - TensorRTEngine
| 方法 | 行为描述 |
|------|----------|
| `load_engine` (109-142) | 在 `_lock` 保护下: (1) 若已有 context 则先释放; (2) **创建新的 CUDA context** `cuda.Device(device_id).make_context()`; (3) 创建 CUDA Stream; (4) 反序列化 engine; (5) 创建 execution context; (6) 分配 buffers。 |
| `infer` (184-253) | 在 `_lock` 保护下: (1) `_cuda_context.push()`; (2) 设置动态 input shape; (3) H2D async memcpy; (4) `execute_async_v2`; (5) D2H async memcpy; (6) `stream.synchronize()`; (7) finally 块中 `_cuda_context.pop()`。 |
| `release` (317-325) | 在 `_lock` 保护下, 幂等释放: `_cuda_context.pop()` + `_cuda_context.detach()`。 |
| `_release_resources` (294-315) | 内部释放: pop/detach CUDA context, synchronize stream, 清空 bindings。 |
**CUDA context 模式:** 每个 TensorRTEngine 实例创建独立的 CUDA context。`pycuda.autoinit` 在 import 时创建一个默认 context, 而 `load_engine` 再创建一个新的。`infer` 使用 push/pop 模式切换到自己的 context。
**`_lock` 使用范围:** 覆盖 `load_engine`, `infer`, `release` 三个公开方法, 保证单引擎实例的线程安全。
**EngineManager:** 持有 `Dict[str, TensorRTEngine]`, 有自己的 `_lock`。当前代码只创建一个 "default" 引擎, 不存在多引擎共享 CUDA context 的问题。
### 1.5 core/result_reporter.py - ResultReporter
| 方法 | 行为描述 |
|------|----------|
| `report_alarm` (119-165) | 接收 AlarmInfo + numpy screenshot。将截图 cv2.imencode JPEG (quality=85) 后 base64 编码写入 `alarm_info.snapshot_b64`。然后 JSON 序列化后 LPUSH 到 `local:alarm:pending`。 |
| `report_alarm_resolve` (167-180) | 将 resolve 数据附加 `_type: "resolve"` 标记后 LPUSH 到同一队列 `local:alarm:pending`。 |
**AlarmInfo 字段:** alarm_id, alarm_type, device_id, scene_id, event_time(ISO8601), alarm_level(0-3), snapshot_b64(Optional[str]), algorithm_code, confidence_score, ext_data(Dict)。
### 1.6 core/alarm_upload_worker.py - AlarmUploadWorker
| 方法 | 行为描述 |
|------|----------|
| `_worker_loop` (141-166) | 主循环: 先处理重试队列, 再 BRPOP `local:alarm:pending` (timeout=2s)。 |
| `_process_alarm` (169-230) | 流程: JSON解析 -> 检查 `_type=="resolve"` 分流 -> base64.b64decode 截图 -> 上传 COS -> HTTP POST 告警元数据。COS 上传失败或 HTTP 失败都进入 `_handle_retry`。 |
| `_handle_retry` (386-425) | 递增 `_retry_count`, 超过 `retry_max` 则 LPUSH 到 `local:alarm:dead`。未超限则计算指数退避延迟, 附加 `_retry_at` 时间戳后 LPUSH 到 `local:alarm:retry`。 |
| `_process_retry_queue` (427-465) | RPOP 逐条检查 `local:alarm:retry`, 到期的放回 pending, 未到期的放回 retry 头部。 |
| `_upload_snapshot_to_cos` (263-316) | base64 解码后直接 `put_object` 上传, object_key 格式: `alarms/{device_id}/{yyyy-MM-dd}/{alarm_id}.jpg`。 |
### 1.7 core/postprocessor.py - PostProcessor
| 方法 | 行为描述 |
|------|----------|
| `_process_gpu` (73-96) | `torch.from_numpy(boxes).cuda()` 转为 GPU 张量, 调用 `torchvision.ops.nms`, 结果 `.cpu().numpy()` 回到 CPU。**未使用 `torch.no_grad()`**。 |
| `batch_process_detections` (705-812) | 解析 TensorRT 输出, 按 batch 拆分, 逐个做 YOLO 输出解析 + NMS。每次调用创建新的 NMSProcessor 实例。 |
**GPU 张量生命周期:** `_process_gpu``boxes_t``scores_t` 是临时变量, 函数返回后即可被 GC 回收。`keep` 张量在 `.cpu().numpy()` 后也成为临时变量。无显式释放, 依赖 Python GC + PyTorch 缓存分配器。
---
## 2. 接口契约
### 2.1 方法签名与返回值
```python
# main.py
def _handle_detections(
self, camera_id: str, roi, bind, frame: VideoFrame,
boxes: Any, scores: Any, class_ids: Any, scale_info: tuple
) -> None
# config_sync.py
def get_roi_configs_with_bindings(
self, camera_id: Optional[str] = None, force_refresh: bool = False
) -> List[ROIInfoNew]
def _init_cloud_redis(self) -> None # 失败时 self._cloud_redis = None
# screenshot_handler.py
def _handle_request(self, fields: dict) -> None
# fields 预期字段: request_id, camera_code, cos_path, callback_url, device_id(可选), rtsp_url(可选)
# tensorrt_engine.py
def infer(self, input_batch: np.ndarray) -> Tuple[List[np.ndarray], float]
# input_batch: shape=[batch, 3, 480, 480], dtype=float16
# returns: (outputs_list, inference_time_ms)
def load_engine(self, engine_path: Optional[str] = None) -> bool
# result_reporter.py
def report_alarm(self, alarm_info: AlarmInfo, screenshot: Optional[np.ndarray] = None) -> bool
def report_alarm_resolve(self, resolve_data: dict) -> bool
# alarm_upload_worker.py
def _process_alarm(self, alarm_json: str) -> None
def _handle_retry(self, alarm_json: str, error: str) -> None
def _upload_snapshot_to_cos(self, image_bytes: bytes, alarm_id: str, device_id: str) -> Optional[str]
# postprocessor.py
def batch_process_detections(
self, batch_outputs: List[np.ndarray], batch_size: int,
conf_threshold: Optional[float] = None, nms_threshold: Optional[float] = None,
per_item_conf_thresholds: Optional[List[float]] = None
) -> List[Tuple[np.ndarray, np.ndarray, np.ndarray]]
```
### 2.2 Redis Key 名与格式
| Key | Redis实例 | 类型 | 格式 | 模块 |
|-----|-----------|------|------|------|
| `local:alarm:pending` | 本地 | List | JSON(AlarmInfo.to_dict() 或 resolve_data) | result_reporter / alarm_upload_worker |
| `local:alarm:retry` | 本地 | List | JSON(带 _retry_count, _retry_at 字段) | alarm_upload_worker |
| `local:alarm:dead` | 本地 | List | JSON(带 _dead_reason, _dead_at 字段) | alarm_upload_worker |
| `device:{device_id}:config` | 云端 | String | JSON(完整配置) | config_sync |
| `device:{device_id}:version` | 云端 | String | int 版本号 | config_sync |
| `device_config_stream` | 云端 | Stream | {device_id, version, action} | config_sync |
| `local:device:config:current` | 本地 | String | JSON(完整配置) | config_sync |
| `local:device:config:backup` | 本地 | String | JSON(上一版本配置) | config_sync |
| `local:device:config:version` | 本地 | String | int 版本号 | config_sync |
| `local:device:config:stream_last_id` | 本地 | String | Stream message ID | config_sync |
| `edge_snap_request` | 云端 | Stream | {request_id, camera_code, cos_path, callback_url, ...} | screenshot_handler |
| `snap:result:{request_id}` | 云端 | String(TTL=60s) | JSON(降级结果) | screenshot_handler |
### 2.3 AlarmInfo 数据结构 (完整字段)
```python
@dataclass
class AlarmInfo:
alarm_id: str # 格式: edge_{device_id}_{YYYYMMDDHHmmss}_{6hex}
alarm_type: str # 算法返回的 alert_type
device_id: str # 实际传入的是 camera_id (非 edge device_id)
scene_id: str # ROI ID
event_time: str # ISO8601 (frame.timestamp)
alarm_level: int # 0=紧急 1=重要 2=普通 3=轻微
snapshot_b64: Optional[str] # JPEG base64, 由 report_alarm 填充
algorithm_code: Optional[str]
confidence_score: Optional[float]
ext_data: Optional[Dict] # 包含: duration_ms, roi_id, bbox, target_class,
# bind_id, message, edge_node_id, first_frame_time,
# vehicle_count, area_id
```
---
## 3. 依赖关系图
```
+-----------------+
| main.py |
| EdgeInference |
| Service |
+--------+--------+
|
+------------------+------------------+
| | | | |
v v v v v
config_sync stream engine postprocess algorithm
(ConfigSync manager manager (PostProc) manager
Manager) |
| | |
v v v
database.py tensorrt_engine.py algorithms/
(SQLiteManager) (TensorRTEngine)
+------------------+
| result_reporter |
| (ResultReporter) |
+--------+---------+
|
| LPUSH local:alarm:pending
v
+------------------+
| alarm_upload |
| _worker |
+--------+---------+
|
+--------+---------+
| COS Upload |
| HTTP POST cloud |
+------------------+
screenshot_handler (独立Redis连接) --> 云端 Redis Stream
```
**数据流转:**
1. `_scheduler_worker` 轮询视频流 -> `_process_frame` 获取ROI+预处理 -> 推入 `_batch_roi_queue`
2. `_inference_worker` 消费队列 -> `_batch_process_rois` 做 TensorRT 推理 + 后处理 -> `_handle_detections`
3. `_handle_detections` -> 算法管理器判定 -> 去重过滤 -> `ResultReporter.report_alarm()` LPUSH Redis
4. `AlarmUploadWorker` BRPOP Redis -> base64 decode -> COS upload -> HTTP POST 云端
---
## 4. 安全边界
### 4.1 绝对不能动的代码 (Critical Path)
| 文件 | 代码区域 | 原因 |
|------|----------|------|
| `tensorrt_engine.py` | `infer()` 的 push/pop/synchronize 顺序 | CUDA context 操作顺序错误会导致段错误或GPU挂死 |
| `tensorrt_engine.py` | `_allocate_buffers()` 的 buffer 分配逻辑 | 改变 buffer 大小会导致推理崩溃 |
| `result_reporter.py` | `AlarmInfo.to_dict()` 的字段名 | alarm_upload_worker 和云端 API 依赖这些字段名 |
| `alarm_upload_worker.py` | Redis key 常量引用 | 必须与 result_reporter 一致 |
| `postprocessor.py` | `_parse_yolo_output` 的输出格式解析 (84行 = 4+80) | 与模型输出格式强耦合 |
### 4.2 可以安全修改的代码
| 文件 | 代码区域 | 注意事项 |
|------|----------|----------|
| `main.py` | `_handle_detections` 的去重逻辑 | 只影响告警频率, 不影响推理管线。但要确保线程安全(当前单线程调用, 无锁)。 |
| `main.py` | `_camera_cooldown_seconds` 默认值 | 可调整, 纯业务参数 |
| `config_sync.py` | `_listen_config_stream` 的重连逻辑 | 注意 `_cloud_redis = None` 的时机 |
| `config_sync.py` | `get_roi_configs_with_bindings` 的 N+1 查询 | 可优化为 JOIN 查询, 但要保持返回值格式不变 |
| `screenshot_handler.py` | `_listen_loop` 的 xack 逻辑 | 可增加重试, 但要注意不能阻塞主循环 |
| `postprocessor.py` | `_process_gpu` 添加 `torch.no_grad()` | 纯优化, 不影响功能 |
### 4.3 修改时需要同步更新的代码对
| 修改点 | 需要同步的位置 |
|--------|----------------|
| `AlarmInfo` 字段变更 | `to_dict()`, `alarm_upload_worker._process_alarm`, 云端API |
| Redis key 名变更 | `result_reporter.py` 常量 + `alarm_upload_worker.py` import |
| `_active_alarms` key 格式变更 | 第865行 resolve 清除逻辑 + 第880行查重逻辑 + 第940行写入 |
| `_camera_alert_cooldown` key 格式变更 | 第888行构建 + 第890行读取 + 第900行写入 |
---
## 5. 已有测试
| 测试文件 | 覆盖模块 | 状态 |
|----------|----------|------|
| `tests/test_config_sync.py` | config_sync.py | 存在 |
| `tests/test_postprocessor.py` | postprocessor.py | 存在 |
| `tests/test_result_reporter.py` | result_reporter.py | 存在 |
| `tests/test_tensorrt.py` | tensorrt_engine.py | 存在 |
| `tests/test_preprocessor.py` | preprocessor.py | 存在 |
| `tests/test_video_stream.py` | video_stream.py | 存在 |
| `tests/test_utils.py` | utils | 存在 |
| `test_leave_post_full_workflow.py` | 离岗检测集成 | 存在(项目根目录) |
| `test_vehicle_algorithms.py` | 车辆算法 | 存在(项目根目录) |
**缺失测试:**
- `main.py` (`_handle_detections`, 去重逻辑) -- **无单元测试**
- `alarm_upload_worker.py` -- **无单元测试**
- `screenshot_handler.py` -- **无单元测试**
---
## 6. 潜在风险
### 6.1 Critical (必须关注)
**[C1] `_handle_detections` 无线程安全保护**
- `_camera_alert_cooldown``_active_alarms` 两个字典在 `_handle_detections` 中读写, 该方法被 `_inference_worker` 线程调用。当前架构下只有一个推理线程, 所以实际上是单线程安全的。但如果未来增加多推理线程, 将产生竞态条件。
- 风险等级: 当前低, 架构变更时高
**[C2] `_active_alarms` resolve 清除使用遍历+break**
- 第865-868行: `for k, v in list(self._active_alarms.items())` 遍历查找匹配的 alarm_id 后 break。如果同一个 alarm_id 被错误地写入多个 key, 只会清除第一个。
- 风险等级: 低(alarm_id 是 UUID 级唯一)
**[C3] CUDA context 与 pycuda.autoinit 共存**
- `import pycuda.autoinit` 在模块加载时创建一个默认 CUDA context。`load_engine` 又创建新的 context。两个 context 共存, 依赖 push/pop 正确切换。如果任何代码在 push/pop 之外使用了 CUDA 操作(如 PostProcessor 的 GPU NMS), 将使用 autoinit 的 context, 与 TensorRT 的 context 不同。
- 风险等级: 中(当前 NMS 使用 PyTorch CUDA, 与 PyCUDA context 独立)
### 6.2 Important (应当修复)
**[I1] `config_sync._listen_config_stream` 通用 Exception 不置 None**
- 第385-390行: 通用 Exception 分支不将 `_cloud_redis` 设为 None, 但 ConnectionError 分支会。如果出现非 ConnectionError 的 Redis 异常(如 ResponseError), 会一直使用同一个可能已损坏的连接重试, 而不是重建连接。
- 建议: 在通用 Exception 中也加入 `self._cloud_redis = None` 触发重连, 或至少尝试 ping 验证连接健康。
**[I2] `get_roi_configs_with_bindings` 的 N+1 查询**
-`camera_id` 为 None 时, 先查所有 ROI, 再逐个查 bindings。ROI 数量多时性能差。
- 注意: 当 `camera_id` 非空时已经使用了 `get_bindings_by_camera` 优化查询, 这是正确的。
- 建议: 添加一个 `get_all_bindings()` 方法或使用 JOIN 查询, 一次取出所有 bindings。
**[I3] `screenshot_handler._listen_loop` 中 xack 失败静默 pass**
- 第195行: xack 失败时 pass, 不记录日志。导致无法发现 ACK 累积问题, PEL (Pending Entries List) 会持续增长。
- 建议: 至少记录 warning 日志。
**[I4] `_process_gpu` 未使用 `torch.no_grad()`**
- NMS 是纯推理操作, 不需要梯度计算。未包裹 `torch.no_grad()` 会导致不必要的计算图记录和额外内存占用。
- 建议: 用 `with torch.no_grad():` 包裹 GPU NMS 调用。
**[I5] `report_alarm` 中截图 base64 编码在调用线程中执行**
- 第133-143行: JPEG 编码 + base64 编码在推理线程中同步执行。一张 1080p 截图约 100-300KB JPEG, base64 后约 130-400KB。对高频告警场景可能阻塞推理线程。
- 当前影响: 有 `_camera_cooldown_seconds=30` 限频, 实际影响有限。
### 6.3 Suggestions (改进建议)
**[S1] `_handle_detections` 中 import 在函数内**
- 第914行: `from core.result_reporter import AlarmInfo, generate_alarm_id` 在热路径函数内 import。虽然 Python 会缓存模块, 但每次调用仍有字典查找开销。
- 建议: 移到文件顶部 import。
**[S2] `batch_process_detections` 每次创建新 NMSProcessor**
- 第753行: 每次调用都 `NMSProcessor(nms_threshold, use_gpu=True)`, 而 PostProcessor.__init__ 已经创建了 `self._nms`
- 建议: 复用 `self._nms` 或在需要不同阈值时参数化调用。
**[S3] `alarm_upload_worker._process_retry_queue` 的 RPOP+LPUSH 非原子**
- 重试队列的检查逻辑: RPOP -> 检查时间 -> LPUSH 回去。在高并发下可能有短暂数据丢失窗口(RPOP 后进程崩溃)。
- 当前影响: 单线程消费, 实际风险极低。
**[S4] `_camera_alert_cooldown` 字典无清理机制**
- 随着时间推移, 已下线的摄像头 + 告警类型的 key 会一直驻留内存。
- 当前影响: 每个 key 约 100 字节, 不太可能成为问题。长期运行建议定期清理。
**[S5] AlarmInfo.device_id 语义歧义**
- `_handle_detections` 第918行传入 `device_id=camera_id`, 但 AlarmInfo 字段名为 `device_id`。云端可能误认为这是边缘设备ID而非摄像头ID。实际的边缘设备ID在 `ext_data.edge_node_id` 中。
- 建议: 确认云端 API 对此字段的预期, 考虑重命名或补充文档。
---
## 7. 修复点风险矩阵
| 修复目标 | 涉及文件 | 变更范围 | 回归风险 | 测试覆盖 |
|----------|----------|----------|----------|----------|
| 告警去重逻辑优化 | main.py | _handle_detections 方法内部 | 低(不影响推理管线) | **无**(需新增) |
| Redis 重连机制增强 | config_sync.py | _listen_config_stream except块 | 中(影响配置同步) | 有(test_config_sync) |
| N+1 查询优化 | config_sync.py + database.py | get_roi_configs_with_bindings | 中(影响配置读取) | 有(test_config_sync) |
| xack 失败处理 | screenshot_handler.py | _listen_loop finally块 | 低(仅日志增强) | **无**(需新增) |
| GPU NMS 优化 | postprocessor.py | _process_gpu | 低(纯优化) | 有(test_postprocessor) |
| CUDA context 安全 | tensorrt_engine.py | load_engine/infer | 高(GPU操作) | 有(test_tensorrt) |
| 截图编码优化 | result_reporter.py | report_alarm | 中(涉及序列化格式) | 有(test_result_reporter) |
---
## 8. 审查总结
**代码整体质量:** 项目架构清晰, 模块职责分明, 关键路径有合理的错误处理和日志输出。Redis 双层架构(云端分发+本地自治)设计合理, 支持离线运行。
**主要关注点:**
1. 告警去重字典 `_camera_alert_cooldown``_active_alarms` 是核心业务逻辑, 修改时务必保持 key 格式和读写位置的一致性。当前无测试覆盖, 是最大风险点。
2. `config_sync._listen_config_stream` 的通用 Exception 处理策略需要与 ConnectionError 保持一致。
3. TensorRT 的 CUDA context push/pop 模式是正确的, 但与 `pycuda.autoinit` 共存需要注意不要在 push/pop 之外的代码中使用 PyCUDA 操作。
4. 截图通过 base64 经 Redis 传递的设计合理(避免文件IO), 但要注意大图场景下的内存和队列压力。

101
main.py
View File

@@ -18,7 +18,6 @@ for _key in ("http_proxy", "https_proxy", "HTTP_PROXY", "HTTPS_PROXY", "all_prox
from config.settings import get_settings, Settings
from core.config_sync import get_config_sync_manager, ConfigSyncManager
from core.debug_http_server import start_debug_http_server
from core.video_stream import MultiStreamManager, VideoFrame
from core.preprocessor import ImagePreprocessor
from core.tensorrt_engine import TensorRTEngine, EngineManager
@@ -56,8 +55,6 @@ class EdgeInferenceService:
self._screenshot_handler: Optional[ScreenshotHandler] = None
self._algorithm_manager: Optional[AlgorithmManager] = None
self._debug_reload_thread: Optional[threading.Thread] = None
self._debug_http_server = None
self._debug_http_thread: Optional[threading.Thread] = None
self._heartbeat_thread: Optional[threading.Thread] = None
self._scheduler_thread: Optional[threading.Thread] = None
@@ -92,6 +89,10 @@ class EdgeInferenceService:
# ROI级别告警去重同ROI+同类型未resolve的告警不重复发送
# key: f"{roi_id}_{alert_type}", value: alarm_id
self._active_alarms: Dict[str, str] = {}
self._active_alarms_time: Dict[str, datetime] = {} # 活跃告警创建时间
self._cleanup_counter = 0
self._cleanup_interval = 100 # 每 100 次 _handle_detections 清理一次
self._active_alarm_max_age_sec = 3600 # 活跃告警最大存活时间1小时
self._logger.info("Edge_Inference_Service 初始化开始")
@@ -128,6 +129,18 @@ class EdgeInferenceService:
daemon=True
).start()
self._config_manager.register_callback("config_update", _on_config_update)
def _on_global_params_update(topic, data):
if self._algorithm_manager:
global_params = data.get("global_params", {})
self._algorithm_manager.update_global_params(global_params)
# 只清除受影响算法的注册缓存,避免无关算法状态丢失
affected_algos = set(global_params.keys())
keys_to_remove = [k for k in self._algorithm_manager._registered_keys if k[2] in affected_algos]
for key in keys_to_remove:
self._algorithm_manager._registered_keys.discard(key)
self._logger.info(f"全局参数回调已触发,清除 {len(keys_to_remove)} 个受影响算法的注册缓存")
self._config_manager.register_callback("global_params_update", _on_global_params_update)
self._logger.info("配置管理器初始化成功")
except Exception as e:
self._logger.error(f"配置管理器初始化失败: {e}")
@@ -194,6 +207,18 @@ class EdgeInferenceService:
try:
self._algorithm_manager = AlgorithmManager()
self._algorithm_manager.start_config_subscription()
# 启动时从 SQLite 加载已有全局参数
try:
from config.database import get_sqlite_manager
db = get_sqlite_manager()
saved_global_params = db.get_all_global_params()
if saved_global_params:
self._algorithm_manager.update_global_params(saved_global_params)
self._logger.info(f"从 SQLite 加载全局参数: {list(saved_global_params.keys())}")
except Exception as e:
self._logger.warning(f"从 SQLite 加载全局参数失败: {e}")
self._logger.info("算法管理器初始化成功")
except Exception as e:
self._logger.error(f"算法管理器初始化失败: {e}")
@@ -277,32 +302,6 @@ class EdgeInferenceService:
)
self._debug_reload_thread.start()
def _start_debug_http_server(self):
"""本地调试:启动 HTTP 同步接口"""
if self._settings.config_sync_mode != "LOCAL":
return
if not getattr(self._settings, "debug", None) or not self._settings.debug.enabled:
return
if self._debug_http_server is not None:
return
host = self._settings.debug.host
port = self._settings.debug.port
self._debug_http_server = start_debug_http_server(host, port)
def worker():
try:
self._debug_http_server.serve_forever()
except Exception as e:
self._logger.warning(f"[DEBUG] HTTP 服务器异常: {e}")
self._debug_http_thread = threading.Thread(
target=worker,
name="DebugHttpServer",
daemon=True,
)
self._debug_http_thread.start()
def _start_heartbeat(self):
"""启动心跳守护线程,每 30 秒向云端上报设备状态"""
def worker():
@@ -375,7 +374,6 @@ class EdgeInferenceService:
self._init_algorithm_manager()
self._init_screenshot_handler()
self._start_debug_reload_watcher()
self._start_debug_http_server()
self._start_heartbeat()
self._performance_stats["start_time"] = datetime.now()
@@ -701,9 +699,11 @@ class EdgeInferenceService:
# 一次性推理整个 batch
outputs, inference_time_ms = engine.infer(batch_data)
self._performance_stats["inference_batches"] += 1
self._logger.log_inference_latency(
self._logger.performance(
"inference_latency_ms",
inference_time_ms,
batch_size=len(chunk),
throughput_fps=1000.0 / inference_time_ms if inference_time_ms > 0 else 0
)
# 诊断:输出原始推理结果形状(非告警诊断日志,使用 DEBUG 级别)
@@ -798,6 +798,12 @@ class EdgeInferenceService:
):
"""处理检测结果 - 算法接管判断权"""
try:
# 惰性清理过期去重记录
self._cleanup_counter += 1
if self._cleanup_counter >= self._cleanup_interval:
self._cleanup_counter = 0
self._cleanup_dedup_dicts(frame.timestamp)
if self._algorithm_manager is None:
self._logger.warning("算法管理器不可用,跳过算法处理")
return
@@ -863,6 +869,7 @@ class EdgeInferenceService:
for k, v in list(self._active_alarms.items()):
if v == resolve_alarm_id:
del self._active_alarms[k]
self._active_alarms_time.pop(k, None)
self._logger.debug(f"[去重] 活跃告警已清除: {k} -> {resolve_alarm_id}")
break
@@ -936,6 +943,7 @@ class EdgeInferenceService:
# 记录活跃告警(用于 ROI 级去重)
self._active_alarms[active_key] = alarm_info.alarm_id
self._active_alarms_time[active_key] = frame.timestamp
# 回填 alarm_id 到算法实例(用于后续 resolve 追踪,泛化支持所有算法类型)
algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get(alert_type)
@@ -951,6 +959,31 @@ class EdgeInferenceService:
except Exception as e:
self._logger.error(f"处理检测结果失败: {e}")
def _cleanup_dedup_dicts(self, now: datetime):
"""惰性清理过期的去重记录"""
# 清理 _camera_alert_cooldown 中已过冷却期的记录
expired_cooldown = [
k for k, v in self._camera_alert_cooldown.items()
if (now - v).total_seconds() > self._camera_cooldown_seconds * 2
]
for k in expired_cooldown:
del self._camera_alert_cooldown[k]
# 清理 _active_alarms 中可能因 resolve 丢失而残留的记录
expired_active = [
k for k, t in self._active_alarms_time.items()
if (now - t).total_seconds() > self._active_alarm_max_age_sec
]
for k in expired_active:
self._active_alarms.pop(k, None)
self._active_alarms_time.pop(k, None)
self._logger.warning(f"[去重] 活跃告警超时清除: {k}")
if expired_cooldown or expired_active:
self._logger.debug(
f"[去重] 清理完成: cooldown={len(expired_cooldown)}, active={len(expired_active)}"
)
def _inference_worker(self):
"""推理线程:攒批窗口内收集 ROI 请求,批量推理"""
while not self._stop_event.is_set():
@@ -1059,12 +1092,6 @@ class EdgeInferenceService:
if self._reporter:
self._reporter.close()
if self._debug_http_server:
try:
self._debug_http_server.shutdown()
except Exception:
pass
self._performance_stats["uptime_seconds"] = (
(datetime.now() - self._performance_stats["start_time"]).total_seconds()
)

View File

@@ -1,64 +1,78 @@
# Edge_Inference_Service 依赖清单
# 安装命令: pip install -r requirements.txt
# 备注:所有版本均选择最稳定版本,经过大量验证
# 环境要求: Python 3.10 | CUDA 12.1 | cuDNN 8.9 | TensorRT 8.6.1
# Docker 基础镜像: nvcr.io/nvidia/tensorrt:23.08-py3
# ============================================================
# 核心依赖(必需
# GPU 推理依赖TensorRT 8.6 + CUDA 12.1
# ============================================================
# 视频处理 - OpenCV 4.8.0最稳定的4.x版本
opencv-python==4.8.0.74
# PyTorch - CUDA 12.1 下最稳定版本
# 安装命令: pip install torch==2.1.2 torchvision==0.16.2 --index-url https://download.pytorch.org/whl/cu121
--extra-index-url https://download.pytorch.org/whl/cu121
torch==2.1.2
torchvision==0.16.2
# 数值计算 - NumPy 1.24.0Python 3.8-3.11完美兼容
numpy==1.24.0
# TensorRT Python 绑定NGC 镜像已预装,裸机需手动安装)
tensorrt==8.6.1.6
pycuda==2023.1.1
# YOLO11 目标检测框架
ultralytics==8.3.5
# ONNX 模型转换与优化
onnx==1.16.0
onnxsim==0.4.36
onnxruntime-gpu==1.17.1
# ============================================================
# 核心依赖
# ============================================================
# 视频处理
opencv-python==4.8.0.76
# 数值计算(锁定 1.x避开 NumPy 2.0 破坏性变更)
numpy==1.26.4
# 图像处理
Pillow==10.2.0
# ============================================================
# 数据库依赖
# ============================================================
# ORM框架 - SQLAlchemy 2.0.23,长期支持稳定版
# ORM 框架
sqlalchemy==2.0.23
# MySQL驱动 - PyMySQL 1.1.0,成熟稳定版本
# MySQL 驱动
pymysql==1.1.0
# ============================================================
# 消息队列与缓存
# ============================================================
# MQTT客户端 - Paho-MQTT 1.6.11.x最终稳定版
# MQTT 客户端1.x 最终稳定版
paho-mqtt==1.6.1
# Redis客户端 - Redis 4.6.04.x最终稳定版
# Redis 客户端
redis==4.6.0
# 腾讯云COS SDK - 用于截图上传
# 腾讯云 COS SDK截图上传
cos-python-sdk-v5>=1.9.30
# ============================================================
# 工具库
# ============================================================
# YAML解析 - PyYAML 6.0.1,安全稳定版
pyyaml==6.0.1
requests==2.31.0
psutil==5.9.8
python-dotenv==1.0.1
# ============================================================
# 测试框架
# 测试依赖
# ============================================================
# 单元测试 - PyTest 7.4.47.x最终稳定版
pytest==7.4.4
# 覆盖率报告 - PyTest-Cov 4.1.0,成熟稳定版
pytest-cov==4.1.0
# ============================================================
# 可选依赖(按需安装)
# ============================================================
# GPU推理框架需要CUDA 12.1环境)
# tensorrt==8.6.1.6
# pycuda==2023.1.1
# YOLOv8目标检测按需安装
# ultralytics==8.0.228

314
test_garbage_algorithm.py Normal file
View File

@@ -0,0 +1,314 @@
"""
GarbageDetectionAlgorithm 单元测试
覆盖场景:
1. 无垃圾时保持 IDLE
2. 持续检测到垃圾 → 确认 → 告警
3. 冷却期内不重复触发
4. 清理后发 resolve → 回到 IDLE
5. 清理确认期内垃圾再次出现 → 回到 ALARMED
6. reset() 正确清理状态
"""
import sys
import os
import logging
logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s')
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from datetime import datetime, timedelta
from algorithms import GarbageDetectionAlgorithm
# ===== 工具函数 =====
def make_tracks(roi_id: str, classes: list, confidences: list = None):
"""生成模拟检测结果"""
if confidences is None:
confidences = [0.85] * len(classes)
tracks = []
for i, cls in enumerate(classes):
tracks.append({
"track_id": f"{roi_id}_{i}",
"class": cls,
"confidence": confidences[i],
"bbox": [100 + i * 50, 100, 200 + i * 50, 300],
"matched_rois": [{"roi_id": roi_id}],
})
return tracks
def simulate(algo, roi_id, camera_id, get_tracks_fn, count, interval=1.0, start_time=None):
"""连续模拟帧,返回所有 alerts 和最后时间戳"""
t = start_time or datetime(2026, 4, 17, 10, 0, 0)
all_alerts = []
for i in range(count):
tracks = get_tracks_fn(i)
alerts = algo.process(roi_id, camera_id, tracks, t)
if alerts:
all_alerts.extend(alerts)
t += timedelta(seconds=interval)
return all_alerts, t
# ===== 测试 1无垃圾时保持 IDLE =====
def test_idle_when_no_garbage():
print("\n" + "=" * 60)
print("TEST 1: 无垃圾帧始终保持 IDLE")
print("=" * 60)
algo = GarbageDetectionAlgorithm(confirm_garbage_sec=60)
alerts, _ = simulate(
algo, "roi_1", "cam_1",
lambda i: make_tracks("roi_1", ["person"]), # 只有人,没有垃圾
count=100,
)
assert algo.state == "IDLE", f"Expected IDLE, got {algo.state}"
assert len(alerts) == 0, f"Expected no alerts, got {len(alerts)}"
print(f" 状态: {algo.state}alerts: {len(alerts)} [OK]")
# ===== 测试 2持续检测到垃圾 → 告警 =====
def test_garbage_triggers_alarm():
print("\n" + "=" * 60)
print("TEST 2: 持续 65 秒检测到垃圾 → 告警")
print("=" * 60)
algo = GarbageDetectionAlgorithm(
confirm_garbage_sec=60,
cooldown_sec=1800,
)
alerts, _ = simulate(
algo, "roi_1", "cam_1",
lambda i: make_tracks("roi_1", ["garbage"]),
count=65, # 65 秒,超过 60 秒确认期
)
# 应该在第 60-61 秒触发 1 个告警
assert algo.state == "ALARMED", f"Expected ALARMED, got {algo.state}"
assert len(alerts) == 1, f"Expected 1 alert, got {len(alerts)}"
alert = alerts[0]
assert alert["alert_type"] == "garbage"
assert alert["alarm_level"] == 2
assert alert["garbage_count"] == 1
assert "检测到垃圾" in alert["message"]
print(f" 状态: {algo.state},告警: {alert['message']} [OK]")
# ===== 测试 3冷却期内不重复触发 =====
def test_cooldown_prevents_duplicate():
print("\n" + "=" * 60)
print("TEST 3: 告警后冷却期内持续有垃圾,不重复触发")
print("=" * 60)
algo = GarbageDetectionAlgorithm(
confirm_garbage_sec=10, # 缩短便于测试
confirm_clear_sec=10,
cooldown_sec=300, # 5 分钟冷却
)
# 持续 200 秒有垃圾(远超冷却时间但没超过 300 秒)
alerts, _ = simulate(
algo, "roi_1", "cam_1",
lambda i: make_tracks("roi_1", ["garbage"]),
count=200,
)
assert len(alerts) == 1, f"Expected 1 alert (cooldown), got {len(alerts)}"
assert algo.state == "ALARMED"
print(f" 告警次数: {len(alerts)}(冷却期内不重复)[OK]")
# ===== 测试 4清理后发 resolve → IDLE =====
def test_resolve_after_cleaning():
print("\n" + "=" * 60)
print("TEST 4: 告警后清理 → 发 resolve → IDLE")
print("=" * 60)
algo = GarbageDetectionAlgorithm(
confirm_garbage_sec=10,
confirm_clear_sec=10,
cooldown_sec=300,
)
algo._last_alarm_id = "test_alarm_123" # 模拟 main.py 回填
t = datetime(2026, 4, 17, 10, 0, 0)
all_alerts = []
# Phase 1: 15 秒有垃圾 → 触发告警
for i in range(15):
alerts = algo.process(
"roi_1", "cam_1",
make_tracks("roi_1", ["garbage"]),
t + timedelta(seconds=i)
)
all_alerts.extend(alerts)
assert algo.state == "ALARMED"
# Phase 2: 然后 30 秒无垃圾 → 发 resolve
# 需要等滑动窗口(10s)清空 + confirm_clear_sec(10s) = 20+ 秒
for i in range(15, 45):
alerts = algo.process(
"roi_1", "cam_1",
make_tracks("roi_1", []), # 空
t + timedelta(seconds=i)
)
all_alerts.extend(alerts)
assert algo.state == "IDLE", f"Expected IDLE, got {algo.state}"
resolves = [a for a in all_alerts if a.get("alert_type") == "alarm_resolve"]
assert len(resolves) == 1, f"Expected 1 resolve, got {len(resolves)}"
resolve = resolves[0]
assert resolve["resolve_alarm_id"] == "test_alarm_123"
assert resolve["resolve_type"] == "garbage_removed"
assert resolve["duration_ms"] > 0
print(f" resolve: {resolve['resolve_type']}, 持续 {resolve['duration_ms']}ms [OK]")
# ===== 测试 5清理期内垃圾再出现 → 回到 ALARMED =====
def test_garbage_reappears_during_clearing():
print("\n" + "=" * 60)
print("TEST 5: 清理确认期内垃圾再出现 → 回到 ALARMED")
print("=" * 60)
algo = GarbageDetectionAlgorithm(
confirm_garbage_sec=10,
confirm_clear_sec=20, # 较长的清理确认期
cooldown_sec=300,
)
algo._last_alarm_id = "test_alarm_456"
t = datetime(2026, 4, 17, 10, 0, 0)
# Phase 1: 15 秒有垃圾 → 告警 → ALARMED
for i in range(15):
algo.process("roi_1", "cam_1", make_tracks("roi_1", ["garbage"]),
t + timedelta(seconds=i))
assert algo.state == "ALARMED"
# Phase 2: 5 秒无垃圾 → CONFIRMING_CLEAR
for i in range(15, 25):
algo.process("roi_1", "cam_1", make_tracks("roi_1", []),
t + timedelta(seconds=i))
assert algo.state == "CONFIRMING_CLEAR", f"got {algo.state}"
# Phase 3: 垃圾又出现 5 秒 → 回到 ALARMED
for i in range(25, 40):
algo.process("roi_1", "cam_1", make_tracks("roi_1", ["garbage"]),
t + timedelta(seconds=i))
assert algo.state == "ALARMED", f"Expected ALARMED, got {algo.state}"
print(f" 状态恢复: CONFIRMING_CLEAR → ALARMED [OK]")
# ===== 测试 6reset() 清理状态 =====
def test_reset_clears_state():
print("\n" + "=" * 60)
print("TEST 6: reset() 正确清理所有状态")
print("=" * 60)
algo = GarbageDetectionAlgorithm(confirm_garbage_sec=5)
algo._last_alarm_id = "test"
# 先让它进入某个状态
t = datetime(2026, 4, 17, 10, 0, 0)
for i in range(10):
algo.process("roi_1", "cam_1", make_tracks("roi_1", ["garbage"]),
t + timedelta(seconds=i))
assert algo.state == "ALARMED"
assert len(algo._detection_window) > 0
assert len(algo.alert_cooldowns) > 0
# Reset
algo.reset()
assert algo.state == "IDLE"
assert algo.state_start_time is None
assert algo._last_alarm_id is None
assert algo._garbage_start_time is None
assert len(algo._detection_window) == 0
assert len(algo.alert_cooldowns) == 0
print(" 所有状态已清空 [OK]")
# ===== 测试 7多个垃圾目标计数 =====
def test_multiple_garbage_count():
print("\n" + "=" * 60)
print("TEST 7: ROI 内多个垃圾目标 → garbage_count 正确")
print("=" * 60)
algo = GarbageDetectionAlgorithm(confirm_garbage_sec=5)
alerts, _ = simulate(
algo, "roi_1", "cam_1",
lambda i: make_tracks("roi_1", ["garbage", "garbage", "garbage"]),
count=10,
)
assert len(alerts) == 1
assert alerts[0]["garbage_count"] == 3
print(f" garbage_count: {alerts[0]['garbage_count']} [OK]")
# ===== 测试 8非 target_class 不计入 =====
def test_non_target_class_ignored():
print("\n" + "=" * 60)
print("TEST 8: person/car 类不计入(只看 garbage")
print("=" * 60)
algo = GarbageDetectionAlgorithm(confirm_garbage_sec=10)
alerts, _ = simulate(
algo, "roi_1", "cam_1",
lambda i: make_tracks("roi_1", ["person", "car"]), # 都不是 garbage
count=30,
)
assert algo.state == "IDLE", f"Expected IDLE, got {algo.state}"
assert len(alerts) == 0
print(f" 状态: {algo.state},无告警 [OK]")
# ===== 运行所有测试 =====
if __name__ == "__main__":
tests = [
test_idle_when_no_garbage,
test_garbage_triggers_alarm,
test_cooldown_prevents_duplicate,
test_resolve_after_cleaning,
test_garbage_reappears_during_clearing,
test_reset_clears_state,
test_multiple_garbage_count,
test_non_target_class_ignored,
]
passed = 0
failed = 0
for t in tests:
try:
t()
passed += 1
except AssertionError as e:
print(f" FAIL: {e}")
failed += 1
except Exception as e:
print(f" ERROR: {e}")
import traceback
traceback.print_exc()
failed += 1
print("\n" + "=" * 60)
print(f"结果: {passed} 通过, {failed} 失败")
print("=" * 60)
sys.exit(0 if failed == 0 else 1)

View File

@@ -223,15 +223,6 @@ class StructuredLogger:
"""记录性能指标"""
self._performance_logger.record(metric_name, value, tags)
perf_data = {
"metric": metric_name,
"value": value,
"duration_ms": duration_ms,
"tags": tags
}
self.info(f"性能指标: {metric_name} = {value}", **perf_data)
def log_inference_latency(self, latency_ms: float, batch_size: int = 1):
"""记录推理延迟"""
self.performance(