From 1d4eaf01748f90b877b0ea28d19f5d96e2b9aa1b Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Thu, 5 Mar 2026 16:31:35 +0800 Subject: [PATCH 01/28] =?UTF-8?q?=E6=96=87=E6=A1=A3=EF=BC=9A=E5=88=9B?= =?UTF-8?q?=E5=BB=BA=20CLAUDE.md=20=E5=BC=80=E5=8F=91=E6=8C=87=E5=8D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增内容: - 项目概述(边缘 AI 推理服务功能说明) - 常用命令(本地开发、测试、工具脚本、Docker 部署) - 架构概览(核心模块详细说明) - 配置同步、视频流、推理引擎、告警上报 - 算法模块(离岗、入侵检测) - 数据流说明(配置下发、视频推理、告警上报、截图请求) - Redis Key 设计(云端 + 本地) - 配置文件说明(.env 环境变量、YAML 配置) - 告警上报数据格式(触发和结束) - 开发工作流(添加算法、修改推理流程、优化性能) - 常见问题排查(引擎加载、告警失败、配置更新、GPU 内存) - Git 提交规范 Co-Authored-By: Claude Sonnet 4.5 --- CLAUDE.md | 366 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 366 insertions(+) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..c28a7e0 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,366 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## 项目概述 + +边缘 AI 推理服务,部署在客户现场边缘端。通过 TensorRT GPU 加速实时分析视频流,检测离岗、入侵等异常行为,上报告警到云端。支持配置热更新、截图上传、告警去重等功能。 + +**部署位置:** 边缘端(客户现场) +**运行方式:** 裸机部署或 Docker(需要 GPU 支持) +**主要功能:** +- RTSP 视频流接入(从 WVP 平台拉流) +- TensorRT GPU 推理(批量处理,8 帧/批次) +- 多算法支持(leave_post 离岗、intrusion 入侵) +- 告警去重(ROI 级 + 摄像头级冷却) +- 配置热更新(Redis Stream 订阅) +- 截图上传(腾讯云 COS) +- 告警上报(HTTP POST 到云端) + +## 常用命令 + +### 本地开发 + +```bash +# 安装依赖(Python 3.8+) +pip install -r requirements.txt + +# 配置环境 +cp .env.example .env +# 编辑 .env: +# DEVICE_ID=edge_device_001 +# CLOUD_REDIS_HOST=腾讯云Redis地址 +# CLOUD_REDIS_PORT=6379 +# CLOUD_REDIS_PASSWORD=密码 +# LOCAL_REDIS_HOST=localhost +# LOCAL_REDIS_PORT=6379 +# COS_SECRET_ID=腾讯云COS密钥ID +# COS_SECRET_KEY=腾讯云COS密钥KEY +# COS_BUCKET=your-bucket +# COS_REGION=ap-beijing +# CLOUD_API_URL=http://云端IP:8000 + +# 运行推理服务 +python main.py +``` + +### 测试 + +```bash +# 运行完整工作流测试 +python test_leave_post_full_workflow.py + +# 运行无持续时长测试 +python test_leave_post_no_duration.py + +# 运行单元测试 +pytest tests/ +pytest -v tests/test_config_sync.py +``` + +### 工具脚本 + +```bash +# 诊断缺失摄像头配置 +python diagnose_missing_cameras.py + +# 清理旧的 ROI 配置 +python cleanup_old_rois.py + +# 恢复摄像头配置 +python restore_cameras.py +``` + +### Docker 部署(生产环境) + +```bash +# 构建镜像(需要 CUDA 12.1 + TensorRT 8.6 基础镜像) +docker build -t edge-inference:latest . + +# 运行容器(需要 GPU 支持) +docker run -d \ + --name edge-inference \ + --gpus all \ + --restart=always \ + -v /path/to/models:/app/models \ + -v /path/to/.env:/app/.env \ + -v /path/to/data:/app/data \ + edge-inference:latest + +# 查看日志 +docker logs -f edge-inference + +# 重启容器 +docker restart edge-inference + +# 进入容器调试 +docker exec -it edge-inference /bin/bash + +# 检查 GPU 使用情况 +nvidia-smi +``` + +## 架构概览 + +### 核心模块(core/) + +- **config_sync.py** — 配置同步管理器 + - 订阅云端 Redis Stream: `device_config_stream` + - 拉取配置:`GET device:{device_id}:config` + - 版本控制、自动回滚、离线可用 + - 热更新视频流(启停摄像头) + +- **video_stream.py** — 多流管理器 + - RTSP 拉流、解码、帧缓存 + - 多路并发处理 + - 流状态监控、自动重连 + +- **preprocessor.py** — 图像预处理 + - Resize、Normalize、NCHW 转换 + - 批量预处理(8 帧/批次) + +- **tensorrt_engine.py** — TensorRT 推理引擎 + - Engine 缓存管理 + - 批量推理(提升吞吐量) + - GPU 内存优化 + +- **postprocessor.py** — 后处理器 + - NMS(非极大值抑制) + - ROI 区域过滤(point-in-polygon) + - 算法分发(根据 algorithm_code) + +- **result_reporter.py** — 结果上报器 + - 生成 alarm_id:`edge_{device_id}_{timestamp}_{uuid6}` + - LPUSH 到本地 Redis: `local:alarm:pending` + - 零阻塞(立即返回) + +- **alarm_upload_worker.py** — 告警上传 Worker + - 独立线程,BRPOP 消费队列 + - 上传截图到腾讯云 COS + - HTTP POST 到云端:`/api/ai/alert/edge/report` + - 失败重试(3 次)→ 死信队列 + +- **screenshot_handler.py** — 截图处理器 + - XREADGROUP 订阅云端 Redis Stream: `edge_snap_request` + - 从视频流获取最新帧 + - 上传 COS,HTTP 回调 WVP + +### 算法模块(algorithms.py) + +**已实现算法:** +- **LeavePostAlgorithm** — 离岗检测 + - 检测 ROI 内是否有人 + - 持续无人触发告警 + - 人员回归发送 resolve 通知 + +- **IntrusionAlgorithm** — 入侵检测 + - 检测 ROI 内是否有人入侵 + - 立即触发告警 + +**算法接口:** +```python +class AlgorithmBase: + def process(self, detections, roi_info, camera_id, bind_info): + """ + 处理检测结果 + Args: + detections: 检测框列表 [{class_id, confidence, bbox}] + roi_info: ROI 配置 {roi_id, polygon, ...} + camera_id: 摄像头 ID + bind_info: 算法绑定配置 {threshold, cooldown, ...} + Returns: + 告警信息或 None + """ + pass +``` + +### 数据流 + +``` +配置下发: + WVP → XADD device_config_stream → Edge XREADGROUP + → 拉取 Redis config → 版本校验 → 热更新视频流 + +视频推理: + RTSP 拉流 → 解码 → 预处理 → TensorRT 推理 + → NMS → ROI 过滤 → 算法处理 → 告警去重 + → LPUSH local:alarm:pending + +告警上报: + BRPOP 队列 → 上传 COS 截图 → HTTP POST 云端 + → 失败重试 → 死信队列 + +截图请求: + WVP → XADD edge_snap_request → Edge XREADGROUP + → 获取帧 → 上传 COS → HTTP 回调 WVP +``` + +## Redis Key 设计 + +### 云端 Redis +- `device:{device_id}:config` — 设备最新配置 JSON +- `device:{device_id}:version` — 配置版本号 +- `device_config_stream` — 配置变更 Stream +- `edge_snap_request` — 截图请求 Stream + +### 本地 Redis +- `local:device:config:current` — 当前生效配置 +- `local:device:config:backup` — 上次成功配置(回滚用) +- `local:device:config:version` — 当前版本号 +- `local:alarm:pending` — 待上报告警队列 +- `local:alarm:retry` — 重试队列 +- `local:alarm:dead` — 死信队列 + +## 配置文件 + +### .env 环境变量(关键配置) + +```bash +# 设备标识 +DEVICE_ID=edge_device_001 + +# 云端 Redis(配置同步) +CLOUD_REDIS_HOST=腾讯云Redis地址 +CLOUD_REDIS_PORT=6379 +CLOUD_REDIS_PASSWORD=密码 + +# 本地 Redis(告警队列、配置缓存) +LOCAL_REDIS_HOST=localhost +LOCAL_REDIS_PORT=6379 + +# 腾讯云 COS(截图上传) +COS_SECRET_ID=your_secret_id +COS_SECRET_KEY=your_secret_key +COS_BUCKET=your-bucket-1234567890 +COS_REGION=ap-beijing + +# 云端 API(告警上报) +CLOUD_API_URL=http://云端IP:8000 +``` + +### config/ 目录(YAML 配置) + +- `settings.py` — 配置加载器(读取 .env) +- `database.py` — SQLite 管理器(本地配置持久化) +- `config_models.py` — 配置数据模型 + +## 告警上报数据格式 + +### 告警触发(POST /api/ai/alert/edge/report) + +```json +{ + "alarm_id": "edge_device001_20260305120000_a1b2c3", + "alarm_type": "leave_post", + "device_id": "camera_001", + "scene_id": "roi_001", + "event_time": "2026-03-05T12:00:00Z", + "alarm_level": 2, + "snapshot_url": "https://cos.ap-beijing.myqcloud.com/...", + "confidence_score": 0.92, + "algorithm_code": "YOLO", + "ext_data": { + "bind_id": "bind_123", + "bbox": [100, 100, 300, 400], + "first_frame_time": "2026-03-05T12:00:00Z" + } +} +``` + +### 告警结束(POST /api/ai/alert/edge/resolve) + +```json +{ + "alarm_id": "edge_device001_20260305120000_a1b2c3", + "duration_ms": 120000, + "last_frame_time": "2026-03-05T12:02:00Z", + "resolve_type": "PERSON_RETURN" +} +``` + +## 开发工作流 + +### 添加新算法 +1. 在 `algorithms.py` 创建新的算法类,继承 `AlgorithmBase` +2. 实现 `process()` 方法 +3. 在 `AlgorithmManager` 注册算法 +4. 更新 WVP 后端的算法配置表 + +### 修改推理流程 +1. 修改 `main.py` 中的 `EdgeInferenceService` +2. 调整批量大小:`self._max_batch_size` +3. 调整冷却时间:`self._camera_cooldown_seconds` + +### 调整告警去重策略 +- ROI 级冷却:每个 ROI 独立冷却 +- 摄像头级冷却:同摄像头同类型告警冷却 +- 修改:`main.py` 中的 `_camera_alert_cooldown` 逻辑 + +### 优化性能 +1. **批量推理**:调整 `_max_batch_size`(默认 8) +2. **GPU 内存**:减少视频流并发数 +3. **告警队列**:监控 Redis 队列长度 +4. **TensorRT 引擎**:确保引擎缓存命中 + +## 常见问题 + +### TensorRT 引擎加载慢 +首次运行会构建引擎(5-10 分钟),之后会缓存。 +检查 `models/` 目录下是否有 `.engine` 文件。 + +### 告警上报失败 +检查云端 API 是否可达: +```bash +curl http://云端IP:8000/health +``` + +检查 COS 配置: +```bash +# 查看 .env 中的 COS 配置 +cat .env | grep COS +``` + +### 配置不更新 +检查云端 Redis 连接: +```bash +redis-cli -h 云端Redis地址 -p 6379 -a 密码 ping +``` + +检查配置版本: +```bash +redis-cli GET device:edge_device_001:version +redis-cli GET local:device:config:version +``` + +### 视频流断开 +检查 RTSP 地址是否可访问: +```bash +ffprobe rtsp://云端IP:10002/... +``` + +检查 WVP 流媒体服务: +```bash +docker logs vsp-zlmedia +``` + +### GPU 内存不足 +降低批量大小或减少并发流数量。 +检查 GPU 使用情况: +```bash +nvidia-smi +``` + +## Git 提交规范 + +在修改代码后,使用中文提交信息: + +```bash +git add . +git commit -m "功能:添加XXX功能 + +详细说明... + +Co-Authored-By: Claude Sonnet 4.5 " +``` + +**不要立即 push**,等待用户指示再推送到远程。 From 3d88dfc1c6dd946d903b785de88e3157a34bd514 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Thu, 5 Mar 2026 17:12:15 +0800 Subject: [PATCH 02/28] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=EF=BC=9A=E5=88=A0?= =?UTF-8?q?=E9=99=A4=E7=AE=97=E6=B3=95=E7=BB=91=E5=AE=9A=E5=90=8E=E8=BE=B9?= =?UTF-8?q?=E7=BC=98=E7=AB=AF=E6=9C=AA=E5=81=9C=E6=AD=A2=E5=AF=B9=E5=BA=94?= =?UTF-8?q?=E7=AE=97=E6=B3=95=E6=8E=A8=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 根本原因:config_sync.py 的 _cleanup_stale_records() 方法接收了 incoming_bind_ids 参数但从未使用,导致当 ROI 仍存在但其中某个 算法绑定被删除时,孤立的绑定记录继续留在 SQLite 中,推理循环 仍然从 SQLite 读取到已删除的绑定并继续生成告警。 修复内容: 1. config/database.py: 添加 get_all_bind_ids() 方法 2. core/config_sync.py: 在 _cleanup_stale_records() 中补全 使用 incoming_bind_ids 清理孤立绑定的逻辑 3. algorithms.py: 在 reload_all_algorithms() 中添加清理内存中 孤立算法实例的逻辑,防止内存泄漏 --- algorithms.py | 30 ++++++++++++++++++++++++++++++ config/database.py | 12 +++++++++++- core/config_sync.py | 9 +++++++++ 3 files changed, 50 insertions(+), 1 deletion(-) diff --git a/algorithms.py b/algorithms.py index 9846fe3..ce0e904 100644 --- a/algorithms.py +++ b/algorithms.py @@ -998,9 +998,13 @@ class AlgorithmManager: config_manager = get_config_sync_manager() bindings = config_manager.get_bindings_from_redis("") + # 收集当前配置中有效的 (roi_id, bind_id) 组合 + valid_keys = set() + for bind in bindings: bind_id = bind.get("bind_id") roi_id = bind.get("roi_id") + valid_keys.add((roi_id, bind_id)) if preserve_state: # 仅更新参数,不重置状态 @@ -1012,6 +1016,32 @@ class AlgorithmManager: if self.load_bind_from_redis(bind_id): count += 1 + # 清理内存中已被删除的算法实例 + removed_count = 0 + with self._update_lock: + for roi_id in list(self.algorithms.keys()): + for key in list(self.algorithms[roi_id].keys()): + # key 格式: "roi_id_bind_id" + if key.startswith(roi_id + "_"): + bind_id = key[len(roi_id) + 1:] + if (roi_id, bind_id) not in valid_keys: + for algo in self.algorithms[roi_id][key].values(): + algo.reset() + del self.algorithms[roi_id][key] + # 清除注册缓存 + self._registered_keys = { + k for k in self._registered_keys + if not (k[0] == roi_id and k[1] == bind_id) + } + removed_count += 1 + logger.info(f"清理已删除的算法实例: {key}") + # 如果 roi 下已无算法实例,清理空字典 + if not self.algorithms[roi_id]: + del self.algorithms[roi_id] + + if removed_count > 0: + logger.info(f"已清理 {removed_count} 个孤立算法实例") + logger.info(f"已重新加载 {count} 个算法配置 (preserve_state={preserve_state})") return count except Exception as e: diff --git a/config/database.py b/config/database.py index b1eda4e..d23be56 100644 --- a/config/database.py +++ b/config/database.py @@ -899,7 +899,17 @@ class SQLiteManager: except Exception as e: logger.error(f"删除ROI算法绑定失败: {e}") return 0 - + + def get_all_bind_ids(self) -> List[str]: + """获取所有算法绑定的 bind_id 列表(用于清理孤立绑定)""" + try: + cursor = self._conn.cursor() + cursor.execute("SELECT bind_id FROM roi_algo_bind") + return [row[0] for row in cursor.fetchall()] + except Exception as e: + logger.error(f"获取所有绑定ID失败: {e}") + return [] + def log_config_update( self, config_type: str, diff --git a/core/config_sync.py b/core/config_sync.py index 7bad3e8..75672ae 100644 --- a/core/config_sync.py +++ b/core/config_sync.py @@ -640,6 +640,15 @@ class ConfigSyncManager: removed += 1 + bind_count logger.info(f"清理旧 ROI: {old_roi_id} (含 {bind_count} 条算法绑定)") + # 清理孤立的算法绑定(ROI 仍存在但绑定已被删除的情况) + if incoming_bind_ids: + existing_bind_ids = self._db_manager.get_all_bind_ids() + for old_bind_id in existing_bind_ids: + if old_bind_id not in incoming_bind_ids: + self._db_manager.delete_roi_algo_bind(old_bind_id) + removed += 1 + logger.info(f"清理孤立算法绑定: {old_bind_id}") + if removed > 0: logger.info(f"旧数据清理完成: 共删除 {removed} 条过期记录") From 7a5ddef2f6be6d1672df01d0514f415d18a69d18 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Sun, 8 Mar 2026 17:31:12 +0800 Subject: [PATCH 03/28] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E6=8C=89?= =?UTF-8?q?=E7=AE=97=E6=B3=95=E7=8B=AC=E7=AB=8B=E9=85=8D=E7=BD=AE=E7=BD=AE?= =?UTF-8?q?=E4=BF=A1=E5=BA=A6=E9=98=88=E5=80=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 通过 ALGO_CONF_{ALGO_CODE} 环境变量为每个算法设置独立的 conf_threshold, 未配置的算法回退到全局 CONF_THRESHOLD。推理过程零改动,仅后处理过滤阶段 按 bind.algo_code 使用对应阈值。 当前配置:离岗=0.4(降低漏检),入侵=0.5(减少误报)。 Co-Authored-By: Claude Opus 4.6 --- config/settings.py | 37 ++++++++++++++++++++++++++++++++----- core/postprocessor.py | 27 +++++++++++++++++---------- main.py | 14 ++++++++++++-- 3 files changed, 61 insertions(+), 17 deletions(-) diff --git a/config/settings.py b/config/settings.py index 7e6746d..7fc58af 100644 --- a/config/settings.py +++ b/config/settings.py @@ -5,7 +5,7 @@ import os from dataclasses import dataclass, field -from typing import List, Optional +from typing import Dict, List, Optional @dataclass @@ -117,10 +117,17 @@ class InferenceConfig: input_width: int = 480 input_height: int = 480 batch_size: int = 1 - conf_threshold: float = 0.4 - nms_threshold: float = 0.45 + conf_threshold: float = 0.45 + nms_threshold: float = 0.5 device_id: int = 0 fp16_mode: bool = True + # 按算法类型覆盖置信度阈值,key=algo_code, value=threshold + # 未命中时回退到 conf_threshold + algo_conf_thresholds: Dict[str, float] = field(default_factory=dict) + + def get_conf_threshold(self, algo_code: str) -> float: + """获取指定算法的置信度阈值,未配置则回退全局值""" + return self.algo_conf_thresholds.get(algo_code, self.conf_threshold) # ===================== Debug / Local Sync ===================== @@ -273,8 +280,9 @@ class Settings: input_width=int(os.getenv("INPUT_WIDTH", "480")), input_height=int(os.getenv("INPUT_HEIGHT", "480")), batch_size=int(os.getenv("BATCH_SIZE", "4")), - conf_threshold=float(os.getenv("CONF_THRESHOLD", "0.4")), - nms_threshold=float(os.getenv("NMS_THRESHOLD", "0.45")), + conf_threshold=float(os.getenv("CONF_THRESHOLD", "0.45")), + nms_threshold=float(os.getenv("NMS_THRESHOLD", "0.5")), + algo_conf_thresholds=self._parse_algo_conf_thresholds(), ) self.config_sync_mode = os.getenv("CONFIG_SYNC_MODE", "LOCAL").upper() @@ -297,6 +305,25 @@ class Settings: # 使用 COCO 类别名称 self.class_names = COCO_CLASS_NAMES + @staticmethod + def _parse_algo_conf_thresholds() -> Dict[str, float]: + """解析 ALGO_CONF_* 环境变量,返回 {algo_code: threshold} 字典 + + 环境变量命名规则: ALGO_CONF_{ALGO_CODE},如: + ALGO_CONF_LEAVE_POST=0.35 + ALGO_CONF_INTRUSION=0.55 + """ + prefix = "ALGO_CONF_" + result = {} + for key, value in os.environ.items(): + if key.startswith(prefix) and value: + algo_code = key[len(prefix):].lower() + try: + result[algo_code] = float(value) + except ValueError: + pass + return result + def _parse_working_hours(self) -> List[dict]: """解析工作时间配置""" working_hours_str = os.getenv("WORKING_HOURS", "") diff --git a/core/postprocessor.py b/core/postprocessor.py index 87a87f2..1c67d01 100644 --- a/core/postprocessor.py +++ b/core/postprocessor.py @@ -35,7 +35,7 @@ class NMSProcessor: 可选 GPU 加速 (torchvision.ops.nms) """ - def __init__(self, nms_threshold: float = 0.45, use_gpu: bool = False): + def __init__(self, nms_threshold: float = 0.5, use_gpu: bool = False): self.nms_threshold = nms_threshold self.use_gpu = use_gpu and _HAS_TORCH self._logger = get_logger("postprocessor") @@ -162,7 +162,7 @@ class NMSProcessor: boxes: np.ndarray, scores: np.ndarray, class_ids: np.ndarray, - conf_threshold: float = 0.5, + conf_threshold: float = 0.45, max_output_size: int = 300 ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ @@ -579,8 +579,8 @@ class PostProcessor: "alert_cooldown": 300, } - self.nms_threshold = config.get("nms_threshold", 0.45) - self.conf_threshold = config.get("conf_threshold", 0.5) + self.nms_threshold = config.get("nms_threshold", 0.5) + self.conf_threshold = config.get("conf_threshold", 0.45) MAX_DETECTIONS = 8400 self._buffer_xyxy = np.zeros((MAX_DETECTIONS, 4), dtype=np.float32) @@ -707,22 +707,24 @@ class PostProcessor: batch_outputs: List[np.ndarray], batch_size: int, conf_threshold: Optional[float] = None, - nms_threshold: Optional[float] = None + nms_threshold: Optional[float] = None, + per_item_conf_thresholds: Optional[List[float]] = None, ) -> List[Tuple[np.ndarray, np.ndarray, np.ndarray]]: """ 批量处理多 ROI 检测结果 - + Args: batch_outputs: 模型输出 (可能是 [output] 或单个 batch 数组) batch_size: batch 大小 conf_threshold: 置信度阈值 nms_threshold: NMS阈值 - + per_item_conf_thresholds: 每个 batch item 独立的置信度阈值列表, + 长度必须等于 batch_size。传入时忽略 conf_threshold 参数。 + Returns: List of (检测框, 置信度, 类别ID) per ROI """ - if conf_threshold is None: - conf_threshold = self.conf_threshold + default_conf = conf_threshold if conf_threshold is not None else self.conf_threshold if nms_threshold is None: nms_threshold = self.nms_threshold @@ -793,11 +795,16 @@ class PostProcessor: self._buffer_xyxy[:valid_count, 2] = boxes[:, 0] + boxes[:, 2] / 2 self._buffer_xyxy[:valid_count, 3] = boxes[:, 1] + boxes[:, 3] / 2 + item_conf = ( + per_item_conf_thresholds[batch_idx] + if per_item_conf_thresholds is not None + else default_conf + ) keep_boxes, keep_scores, keep_classes = nms_processor.process_with_confidence_filter( self._buffer_xyxy[:valid_count], scores_coarse.astype(np.float32), class_ids, - conf_threshold + item_conf ) results.append((keep_boxes, keep_scores, keep_classes)) diff --git a/main.py b/main.py index 149f1e3..f7db103 100644 --- a/main.py +++ b/main.py @@ -598,14 +598,24 @@ class EdgeInferenceService: self._logger.debug(f"[推理诊断] batch_data shape={batch_data.shape}, outputs={shapes}, 耗时={inference_time_ms:.1f}ms") batch_size = len(chunk) + + # 按算法类型获取每个 item 的独立置信度阈值 + per_item_conf = [ + self._settings.inference.get_conf_threshold(item[2].algo_code) + for item in chunk + ] + batch_results = self._postprocessor.batch_process_detections( outputs, batch_size, - conf_threshold=self._settings.inference.conf_threshold + per_item_conf_thresholds=per_item_conf, ) total_detections = sum(len(r[0]) for r in batch_results) - self._logger.debug(f"[推理] batch_size={batch_size}, 总检测数={total_detections}, conf_thresh={self._settings.inference.conf_threshold}") + self._logger.debug( + f"[推理] batch_size={batch_size}, 总检测数={total_detections}, " + f"conf_thresholds={per_item_conf}" + ) for idx, (camera_id, roi, bind, frame, _, scale_info) in enumerate(chunk): boxes, scores, class_ids = batch_results[idx] From 1211fc72076c8818f3f654465b02cf26f28846d1 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Sun, 8 Mar 2026 23:20:31 +0800 Subject: [PATCH 04/28] =?UTF-8?q?feat:=20=E7=A6=BB=E5=B2=97=E7=AE=97?= =?UTF-8?q?=E6=B3=95=E6=BB=91=E5=8A=A8=E7=AA=97=E5=8F=A3=E6=8A=97=E6=8A=96?= =?UTF-8?q?=E5=8A=A8=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 上岗确认阈值从 0.7 降为 0.6,降低漏确认 - 离岗触发从 ratio==0 改为 ratio<0.2,允许 20% 抖动 - 离岗中断从单帧判断改为 ratio>=0.5,避免偶尔一帧误检打断确认 - 入侵算法不变,通过调高 conf 到 0.6 解决误报 Co-Authored-By: Claude Opus 4.6 --- algorithms.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/algorithms.py b/algorithms.py index ce0e904..fc25168 100644 --- a/algorithms.py +++ b/algorithms.py @@ -241,7 +241,7 @@ class LeavePostAlgorithm: self.state_start_time = current_time self.detection_window.clear() logger.debug(f"ROI {roi_id}: CONFIRMING_ON_DUTY → INIT (人消失)") - elif elapsed >= self.confirm_on_duty_sec and detection_ratio >= 0.7: + elif elapsed >= self.confirm_on_duty_sec and detection_ratio >= 0.6: # 上岗确认成功(命中率>=70%) self.state = self.STATE_ON_DUTY self.state_start_time = current_time @@ -250,8 +250,8 @@ class LeavePostAlgorithm: elif self.state == self.STATE_ON_DUTY: # 在岗状态:监控是否离岗 - if detection_ratio == 0: - # 滑动窗口内完全没有人,进入离岗确认 + if detection_ratio < 0.2: + # 滑动窗口内 80% 以上帧无人,进入离岗确认 self.state = self.STATE_CONFIRMING_OFF_DUTY self.state_start_time = current_time logger.debug(f"ROI {roi_id}: ON_DUTY → CONFIRMING_OFF_DUTY") @@ -260,12 +260,12 @@ class LeavePostAlgorithm: # 离岗确认中:需要持续未检测到人 elapsed = (current_time - self.state_start_time).total_seconds() - if roi_has_person: - # 人回来了,回到ON_DUTY + if detection_ratio >= 0.5: + # 窗口内检测率恢复到 50% 以上,人确实回来了 self.state = self.STATE_ON_DUTY self.state_start_time = current_time - logger.debug(f"ROI {roi_id}: CONFIRMING_OFF_DUTY → ON_DUTY (人回来了)") - elif elapsed >= self.confirm_off_duty_sec and detection_ratio == 0: + logger.debug(f"ROI {roi_id}: CONFIRMING_OFF_DUTY → ON_DUTY (人回来了, ratio={detection_ratio:.2f})") + elif elapsed >= self.confirm_off_duty_sec and detection_ratio < 0.2: # 离岗确认成功,进入倒计时 self.state = self.STATE_OFF_DUTY_COUNTDOWN self.state_start_time = current_time From 82d17a5266bdfae7809857ba6809bf4ece35c3e1 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Mon, 9 Mar 2026 17:20:50 +0800 Subject: [PATCH 05/28] =?UTF-8?q?fix:=20alarm=5Fid=20=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E6=88=B3=E6=94=B9=E4=B8=BA=E5=8C=97=E4=BA=AC=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit generate_alarm_id() 中 datetime.now(timezone.utc) 改为 datetime.now(timezone(timedelta(hours=8))),与服务端时间一致。 Co-Authored-By: Claude Opus 4.6 --- core/result_reporter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/result_reporter.py b/core/result_reporter.py index c2af8e3..9cca97e 100644 --- a/core/result_reporter.py +++ b/core/result_reporter.py @@ -9,7 +9,7 @@ import json import logging import uuid -from datetime import datetime, timezone +from datetime import datetime, timezone, timedelta from typing import Any, Dict, List, Optional from dataclasses import dataclass, field @@ -56,7 +56,7 @@ def generate_alarm_id(device_id: str) -> str: 生成告警ID 格式: edge_{device_id}_{YYYYMMDDHHmmss}_{6位uuid} """ - timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") + timestamp = datetime.now(timezone(timedelta(hours=8))).strftime("%Y%m%d%H%M%S") uid = uuid.uuid4().hex[:6] return f"edge_{device_id}_{timestamp}_{uid}" From d132a50ae021c8e0cf8510d54677f402d5c3380b Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Mon, 9 Mar 2026 17:22:50 +0800 Subject: [PATCH 06/28] =?UTF-8?q?fix:=20=E5=91=8A=E8=AD=A6=E4=B8=8A?= =?UTF-8?q?=E4=BC=A0COS=E8=B7=AF=E5=BE=84=E6=97=A5=E6=9C=9F=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E5=8C=97=E4=BA=AC=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit alarm_upload_worker.py 中 COS object key 的日期和死信时间戳 从 UTC 改为北京时间(UTC+8),与服务端和日志时间一致。 Co-Authored-By: Claude Opus 4.6 --- core/alarm_upload_worker.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/alarm_upload_worker.py b/core/alarm_upload_worker.py index 2637d16..c473a96 100644 --- a/core/alarm_upload_worker.py +++ b/core/alarm_upload_worker.py @@ -17,7 +17,9 @@ import json import logging import threading import time -from datetime import datetime, timezone +from datetime import datetime, timezone, timedelta + +_BEIJING_TZ = timezone(timedelta(hours=8)) from typing import Any, Dict, Optional import redis @@ -294,7 +296,7 @@ class AlarmUploadWorker: return None # 生成 Object Key: alarms/{device_id}/{yyyy-MM-dd}/{alarm_id}.jpg - date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") + date_str = datetime.now(_BEIJING_TZ).strftime("%Y-%m-%d") object_key = f"alarms/{device_id}/{date_str}/{alarm_id}.jpg" try: @@ -392,7 +394,7 @@ class AlarmUploadWorker: if retry_count > max_retry: # 超过最大重试次数,写入死信队列 alarm_data["_dead_reason"] = error - alarm_data["_dead_at"] = datetime.now(timezone.utc).isoformat() + alarm_data["_dead_at"] = datetime.now(_BEIJING_TZ).isoformat() dead_json = json.dumps(alarm_data, ensure_ascii=False) self._redis.lpush(REDIS_KEY_ALARM_DEAD, dead_json) self._stats["dead_lettered"] += 1 From 9a1ac16f19832454053f859dd83e8672ccbb4c63 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Wed, 11 Mar 2026 09:40:51 +0800 Subject: [PATCH 07/28] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=EF=BC=9A=E6=88=AA?= =?UTF-8?q?=E5=9B=BE=E5=9B=9E=E8=B0=83=E7=A6=81=E7=94=A8=E7=B3=BB=E7=BB=9F?= =?UTF-8?q?=E4=BB=A3=E7=90=86=EF=BC=8C=E8=A7=A3=E5=86=B3502=E5=9B=9E?= =?UTF-8?q?=E8=B0=83=E5=A4=B1=E8=B4=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit requests.post回调WVP时被本地代理(127.0.0.1:7897)拦截导致502, 添加proxies=None绕过系统代理直连WVP。 Co-Authored-By: Claude Opus 4.6 --- core/screenshot_handler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/screenshot_handler.py b/core/screenshot_handler.py index 92421ea..ce503bc 100644 --- a/core/screenshot_handler.py +++ b/core/screenshot_handler.py @@ -326,7 +326,8 @@ class ScreenshotHandler: if callback_url: try: url = callback_url.rstrip("/") + SNAP_CALLBACK_PATH - resp = requests.post(url, json=body, timeout=SNAP_CALLBACK_TIMEOUT) + resp = requests.post(url, json=body, timeout=SNAP_CALLBACK_TIMEOUT, + proxies={"http": None, "https": None}) if resp.status_code < 300: logger.info("[截图] HTTP 回调成功: request_id=%s", request_id) return From ea992c6daaee670d27f885250574712d224ab41e Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Mon, 16 Mar 2026 16:54:47 +0800 Subject: [PATCH 08/28] =?UTF-8?q?=E5=8A=9F=E8=83=BD=EF=BC=9A=E6=96=B0?= =?UTF-8?q?=E5=A2=9E=E8=BD=A6=E8=BE=86=E8=BF=9D=E5=81=9C=E5=92=8C=E8=BD=A6?= =?UTF-8?q?=E8=BE=86=E6=8B=A5=E5=A0=B5=E6=A3=80=E6=B5=8B=E7=AE=97=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - IllegalParkingAlgorithm: 5状态机(IDLE→CONFIRMING_VEHICLE→PARKED_COUNTDOWN→ALARMED→CONFIRMING_CLEAR) 禁停区域检测,15秒确认+5分钟倒计时,滑动窗口抗抖动,支持car/truck/bus/motorcycle - VehicleCongestionAlgorithm: 4状态机(NORMAL→CONFIRMING_CONGESTION→CONGESTED→CONFIRMING_CLEAR) 车辆计数≥阈值+持续60秒触发,滑动窗口平均值判断 - AlgorithmManager: 新增default_params、register_algorithm、get_status支持两种新算法 - main.py: 泛化alarm_id回填和first_frame_time提取,ext_data新增vehicle_count字段 Co-Authored-By: Claude Opus 4.6 --- algorithms.py | 532 ++++++++++++++++++++++++++++++++++++++++++++++++-- main.py | 32 +-- 2 files changed, 536 insertions(+), 28 deletions(-) diff --git a/algorithms.py b/algorithms.py index fc25168..5bb10e7 100644 --- a/algorithms.py +++ b/algorithms.py @@ -705,6 +705,494 @@ class IntrusionAlgorithm: # self.alert_triggered.clear() +class IllegalParkingAlgorithm: + """ + 车辆违停检测算法(状态机版本 v1.0) + + 状态机: + IDLE → CONFIRMING_VEHICLE → PARKED_COUNTDOWN → ALARMED → CONFIRMING_CLEAR → IDLE + + 业务流程: + 1. 检测到车辆进入禁停区 → 车辆确认期(confirm_vehicle_sec,默认15秒,ratio≥0.6) + 2. 确认有车 → 违停倒计时(parking_countdown_sec,默认300秒/5分钟) + 3. 倒计时结束仍有车 → 触发告警(ALARMED状态) + 4. 车辆离开 → 消失确认期(confirm_clear_sec,默认30秒,ratio<0.2) + 5. 确认车辆离开 → 发送resolve事件 → 回到空闲状态 + + 使用滑动窗口(10秒)抗抖动,支持多类车辆检测。 + """ + + # 状态定义 + STATE_IDLE = "IDLE" + STATE_CONFIRMING_VEHICLE = "CONFIRMING_VEHICLE" + STATE_PARKED_COUNTDOWN = "PARKED_COUNTDOWN" + STATE_ALARMED = "ALARMED" + STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR" + + # 告警级别常量 + ALARM_LEVEL_ILLEGAL_PARKING = 2 # 一般级别 + + # 滑动窗口参数 + WINDOW_SIZE_SEC = 10 + + def __init__( + self, + confirm_vehicle_sec: int = 15, + parking_countdown_sec: int = 300, + confirm_clear_sec: int = 30, + cooldown_sec: int = 600, + target_classes: Optional[List[str]] = None, + ): + self.confirm_vehicle_sec = confirm_vehicle_sec + self.parking_countdown_sec = parking_countdown_sec + self.confirm_clear_sec = confirm_clear_sec + self.cooldown_sec = cooldown_sec + self.target_classes = target_classes or ["car", "truck", "bus", "motorcycle"] + + # 状态变量 + self.state: str = self.STATE_IDLE + self.state_start_time: Optional[datetime] = None + + # 滑动窗口:存储 (timestamp, has_vehicle: bool) + self._detection_window: deque = deque() + + # 告警追踪 + self._last_alarm_id: Optional[str] = None + self._parking_start_time: Optional[datetime] = None + + # 冷却期管理 + self.alert_cooldowns: Dict[str, datetime] = {} + + def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: + matched_rois = detection.get("matched_rois", []) + for roi in matched_rois: + if roi.get("roi_id") == roi_id: + return True + return False + + def _check_target_classes(self, detection: Dict) -> bool: + """检查检测目标是否属于车辆类别""" + det_class = detection.get("class", "") + return det_class in self.target_classes + + def _update_window(self, current_time: datetime, has_vehicle: bool): + """更新滑动窗口""" + self._detection_window.append((current_time, has_vehicle)) + cutoff = current_time - timedelta(seconds=self.WINDOW_SIZE_SEC) + while self._detection_window and self._detection_window[0][0] < cutoff: + self._detection_window.popleft() + + def _get_window_ratio(self) -> float: + """获取滑动窗口内的检测命中率""" + if not self._detection_window: + return 0.0 + hits = sum(1 for _, has in self._detection_window if has) + return hits / len(self._detection_window) + + def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]: + for det in tracks: + if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det): + return det.get("bbox", []) + return [] + + def _get_max_confidence(self, tracks: List[Dict], roi_id: str) -> float: + """获取ROI内车辆的最高置信度""" + max_conf = 0.0 + for det in tracks: + if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det): + max_conf = max(max_conf, det.get("confidence", 0.0)) + return max_conf + + def process( + self, + roi_id: str, + camera_id: str, + tracks: List[Dict], + current_time: Optional[datetime] = None, + ) -> List[Dict]: + """处理单帧检测结果""" + current_time = current_time or datetime.now() + alerts = [] + + # 检查ROI内是否有车辆 + roi_has_vehicle = any( + self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det) + for det in tracks + ) + + # 更新滑动窗口 + self._update_window(current_time, roi_has_vehicle) + ratio = self._get_window_ratio() + + # === 状态机处理 === + + if self.state == self.STATE_IDLE: + if roi_has_vehicle: + self.state = self.STATE_CONFIRMING_VEHICLE + self.state_start_time = current_time + logger.debug(f"ROI {roi_id}: IDLE → CONFIRMING_VEHICLE") + + elif self.state == self.STATE_CONFIRMING_VEHICLE: + if self.state_start_time is None: + self.state = self.STATE_IDLE + return alerts + + elapsed = (current_time - self.state_start_time).total_seconds() + + if ratio < 0.3: + # 命中率过低,车辆可能只是路过 + self.state = self.STATE_IDLE + self.state_start_time = None + logger.debug(f"ROI {roi_id}: CONFIRMING_VEHICLE → IDLE (ratio={ratio:.2f}<0.3)") + elif elapsed >= self.confirm_vehicle_sec and ratio >= 0.6: + # 确认有车辆停留,进入倒计时 + self._parking_start_time = self.state_start_time + self.state = self.STATE_PARKED_COUNTDOWN + self.state_start_time = current_time + logger.info(f"ROI {roi_id}: CONFIRMING_VEHICLE → PARKED_COUNTDOWN (ratio={ratio:.2f})") + + elif self.state == self.STATE_PARKED_COUNTDOWN: + if self.state_start_time is None: + self.state = self.STATE_IDLE + return alerts + + elapsed = (current_time - self.state_start_time).total_seconds() + + if ratio < 0.2: + # 车辆已离开 + self.state = self.STATE_IDLE + self.state_start_time = None + self._parking_start_time = None + logger.debug(f"ROI {roi_id}: PARKED_COUNTDOWN → IDLE (车辆离开, ratio={ratio:.2f})") + elif elapsed >= self.parking_countdown_sec: + # 倒计时结束,检查冷却期 + cooldown_key = f"{camera_id}_{roi_id}" + if cooldown_key not in self.alert_cooldowns or \ + (current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec: + + bbox = self._get_latest_bbox(tracks, roi_id) + confidence = self._get_max_confidence(tracks, roi_id) + + alerts.append({ + "roi_id": roi_id, + "camera_id": camera_id, + "bbox": bbox, + "alert_type": "illegal_parking", + "alarm_level": self.ALARM_LEVEL_ILLEGAL_PARKING, + "confidence": confidence, + "message": f"检测到车辆违停(已停留{int(elapsed / 60)}分钟)", + "first_frame_time": self._parking_start_time.strftime('%Y-%m-%d %H:%M:%S') if self._parking_start_time else None, + "duration_minutes": elapsed / 60, + }) + + self.alert_cooldowns[cooldown_key] = current_time + self.state = self.STATE_ALARMED + logger.warning(f"ROI {roi_id}: PARKED_COUNTDOWN → ALARMED (违停告警触发)") + else: + self.state = self.STATE_IDLE + self.state_start_time = None + self._parking_start_time = None + logger.debug(f"ROI {roi_id}: PARKED_COUNTDOWN → IDLE (冷却期内)") + + elif self.state == self.STATE_ALARMED: + if ratio < 0.3: + self.state = self.STATE_CONFIRMING_CLEAR + self.state_start_time = current_time + logger.debug(f"ROI {roi_id}: ALARMED → CONFIRMING_CLEAR") + + elif self.state == self.STATE_CONFIRMING_CLEAR: + if self.state_start_time is None: + self.state = self.STATE_IDLE + return alerts + + elapsed = (current_time - self.state_start_time).total_seconds() + + if ratio >= 0.5: + # 车辆又出现,回到ALARMED + self.state = self.STATE_ALARMED + self.state_start_time = None + logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → ALARMED (车辆仍在)") + elif elapsed >= self.confirm_clear_sec and ratio < 0.2: + # 确认车辆已离开 + if self._last_alarm_id and self._parking_start_time: + duration_ms = int((current_time - self._parking_start_time).total_seconds() * 1000) + alerts.append({ + "alert_type": "alarm_resolve", + "resolve_alarm_id": self._last_alarm_id, + "duration_ms": duration_ms, + "last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), + "resolve_type": "vehicle_left", + }) + logger.info(f"ROI {roi_id}: 违停告警已解决(车辆离开)") + + self.state = self.STATE_IDLE + self.state_start_time = None + self._last_alarm_id = None + self._parking_start_time = None + logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → IDLE") + + return alerts + + def set_last_alarm_id(self, alarm_id: str): + """由 main.py 在告警生成后回填 alarm_id""" + self._last_alarm_id = alarm_id + + def reset(self): + """重置算法状态""" + self.state = self.STATE_IDLE + self.state_start_time = None + self._last_alarm_id = None + self._parking_start_time = None + self._detection_window.clear() + self.alert_cooldowns.clear() + + def get_state(self, current_time: Optional[datetime] = None) -> Dict[str, Any]: + """获取当前状态""" + current_time = current_time or datetime.now() + state_info = { + "state": self.state, + "state_start_time": self.state_start_time.isoformat() if self.state_start_time else None, + "window_ratio": self._get_window_ratio(), + } + if self.state in (self.STATE_ALARMED, self.STATE_PARKED_COUNTDOWN) and self._parking_start_time: + state_info["parking_duration_sec"] = (current_time - self._parking_start_time).total_seconds() + state_info["alarm_id"] = self._last_alarm_id + return state_info + + +class VehicleCongestionAlgorithm: + """ + 车辆拥堵检测算法(状态机版本 v1.0) + + 状态机: + NORMAL → CONFIRMING_CONGESTION → CONGESTED → CONFIRMING_CLEAR → NORMAL + + 业务流程: + 1. 检测到车辆数量 ≥ count_threshold → 拥堵确认期(confirm_congestion_sec,默认60秒) + 2. 确认拥堵(窗口内平均车辆数 ≥ threshold)→ 触发告警 + 3. 车辆减少 → 消散确认期(confirm_clear_sec,默认120秒) + 4. 确认消散(平均数 < threshold)→ 发送resolve事件 → 回到正常 + + 使用滑动窗口(10秒)存储车辆计数,取平均值判断。 + """ + + # 状态定义 + STATE_NORMAL = "NORMAL" + STATE_CONFIRMING_CONGESTION = "CONFIRMING_CONGESTION" + STATE_CONGESTED = "CONGESTED" + STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR" + + # 告警级别 + ALARM_LEVEL_CONGESTION = 2 # 一般级别 + + # 滑动窗口参数 + WINDOW_SIZE_SEC = 10 + + def __init__( + self, + count_threshold: int = 3, + confirm_congestion_sec: int = 60, + confirm_clear_sec: int = 120, + cooldown_sec: int = 600, + target_classes: Optional[List[str]] = None, + ): + self.count_threshold = count_threshold + self.confirm_congestion_sec = confirm_congestion_sec + self.confirm_clear_sec = confirm_clear_sec + self.cooldown_sec = cooldown_sec + self.target_classes = target_classes or ["car", "truck", "bus", "motorcycle"] + + # 状态变量 + self.state: str = self.STATE_NORMAL + self.state_start_time: Optional[datetime] = None + + # 滑动窗口:存储 (timestamp, vehicle_count: int) + self._count_window: deque = deque() + + # 告警追踪 + self._last_alarm_id: Optional[str] = None + self._congestion_start_time: Optional[datetime] = None + + # 冷却期管理 + self.alert_cooldowns: Dict[str, datetime] = {} + + def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool: + matched_rois = detection.get("matched_rois", []) + for roi in matched_rois: + if roi.get("roi_id") == roi_id: + return True + return False + + def _check_target_classes(self, detection: Dict) -> bool: + det_class = detection.get("class", "") + return det_class in self.target_classes + + def _count_vehicles_in_roi(self, tracks: List[Dict], roi_id: str) -> int: + """统计ROI内的车辆数量""" + return sum( + 1 for det in tracks + if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det) + ) + + def _update_count_window(self, current_time: datetime, count: int): + """更新车辆计数滑动窗口""" + self._count_window.append((current_time, count)) + cutoff = current_time - timedelta(seconds=self.WINDOW_SIZE_SEC) + while self._count_window and self._count_window[0][0] < cutoff: + self._count_window.popleft() + + def _get_avg_count(self) -> float: + """获取滑动窗口内的平均车辆数""" + if not self._count_window: + return 0.0 + total = sum(c for _, c in self._count_window) + return total / len(self._count_window) + + def _get_max_confidence(self, tracks: List[Dict], roi_id: str) -> float: + max_conf = 0.0 + for det in tracks: + if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det): + max_conf = max(max_conf, det.get("confidence", 0.0)) + return max_conf + + def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]: + for det in tracks: + if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det): + return det.get("bbox", []) + return [] + + def process( + self, + roi_id: str, + camera_id: str, + tracks: List[Dict], + current_time: Optional[datetime] = None, + ) -> List[Dict]: + """处理单帧检测结果""" + current_time = current_time or datetime.now() + alerts = [] + + # 统计ROI内车辆数 + vehicle_count = self._count_vehicles_in_roi(tracks, roi_id) + self._update_count_window(current_time, vehicle_count) + avg_count = self._get_avg_count() + + # === 状态机处理 === + + if self.state == self.STATE_NORMAL: + if avg_count >= self.count_threshold: + self.state = self.STATE_CONFIRMING_CONGESTION + self.state_start_time = current_time + logger.debug(f"ROI {roi_id}: NORMAL → CONFIRMING_CONGESTION (avg={avg_count:.1f}≥{self.count_threshold})") + + elif self.state == self.STATE_CONFIRMING_CONGESTION: + if self.state_start_time is None: + self.state = self.STATE_NORMAL + return alerts + + elapsed = (current_time - self.state_start_time).total_seconds() + + if avg_count < self.count_threshold: + # 车辆减少,回到正常 + self.state = self.STATE_NORMAL + self.state_start_time = None + logger.debug(f"ROI {roi_id}: CONFIRMING_CONGESTION → NORMAL (avg={avg_count:.1f}<{self.count_threshold})") + elif elapsed >= self.confirm_congestion_sec: + # 确认拥堵,检查冷却期 + cooldown_key = f"{camera_id}_{roi_id}" + if cooldown_key not in self.alert_cooldowns or \ + (current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec: + + self._congestion_start_time = self.state_start_time + bbox = self._get_latest_bbox(tracks, roi_id) + confidence = self._get_max_confidence(tracks, roi_id) + + alerts.append({ + "roi_id": roi_id, + "camera_id": camera_id, + "bbox": bbox, + "alert_type": "vehicle_congestion", + "alarm_level": self.ALARM_LEVEL_CONGESTION, + "confidence": confidence, + "message": f"检测到车辆拥堵(平均{avg_count:.0f}辆,持续{int(elapsed)}秒)", + "first_frame_time": self._congestion_start_time.strftime('%Y-%m-%d %H:%M:%S') if self._congestion_start_time else None, + "vehicle_count": int(avg_count), + }) + + self.alert_cooldowns[cooldown_key] = current_time + self.state = self.STATE_CONGESTED + logger.warning(f"ROI {roi_id}: CONFIRMING_CONGESTION → CONGESTED (拥堵告警触发, avg={avg_count:.1f})") + else: + self.state = self.STATE_NORMAL + self.state_start_time = None + logger.debug(f"ROI {roi_id}: CONFIRMING_CONGESTION → NORMAL (冷却期内)") + + elif self.state == self.STATE_CONGESTED: + if avg_count < self.count_threshold: + self.state = self.STATE_CONFIRMING_CLEAR + self.state_start_time = current_time + logger.debug(f"ROI {roi_id}: CONGESTED → CONFIRMING_CLEAR (avg={avg_count:.1f}<{self.count_threshold})") + + elif self.state == self.STATE_CONFIRMING_CLEAR: + if self.state_start_time is None: + self.state = self.STATE_NORMAL + return alerts + + elapsed = (current_time - self.state_start_time).total_seconds() + + if avg_count >= self.count_threshold: + # 又拥堵了,回到CONGESTED + self.state = self.STATE_CONGESTED + self.state_start_time = None + logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → CONGESTED (avg={avg_count:.1f}≥{self.count_threshold})") + elif elapsed >= self.confirm_clear_sec: + # 确认消散 + if self._last_alarm_id and self._congestion_start_time: + duration_ms = int((current_time - self._congestion_start_time).total_seconds() * 1000) + alerts.append({ + "alert_type": "alarm_resolve", + "resolve_alarm_id": self._last_alarm_id, + "duration_ms": duration_ms, + "last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'), + "resolve_type": "congestion_cleared", + }) + logger.info(f"ROI {roi_id}: 拥堵告警已解决(车辆减少)") + + self.state = self.STATE_NORMAL + self.state_start_time = None + self._last_alarm_id = None + self._congestion_start_time = None + logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → NORMAL") + + return alerts + + def set_last_alarm_id(self, alarm_id: str): + """由 main.py 在告警生成后回填 alarm_id""" + self._last_alarm_id = alarm_id + + def reset(self): + """重置算法状态""" + self.state = self.STATE_NORMAL + self.state_start_time = None + self._last_alarm_id = None + self._congestion_start_time = None + self._count_window.clear() + self.alert_cooldowns.clear() + + def get_state(self, current_time: Optional[datetime] = None) -> Dict[str, Any]: + """获取当前状态""" + current_time = current_time or datetime.now() + state_info = { + "state": self.state, + "state_start_time": self.state_start_time.isoformat() if self.state_start_time else None, + "avg_vehicle_count": self._get_avg_count(), + } + if self.state in (self.STATE_CONGESTED, self.STATE_CONFIRMING_CLEAR) and self._congestion_start_time: + state_info["congestion_duration_sec"] = (current_time - self._congestion_start_time).total_seconds() + state_info["alarm_id"] = self._last_alarm_id + return state_info + + class AlgorithmManager: def __init__(self, working_hours: Optional[List[Dict]] = None): self.algorithms: Dict[str, Dict[str, Any]] = {} @@ -724,11 +1212,20 @@ class AlgorithmManager: "confirm_seconds": 5, "target_class": None, }, - # "crowd_detection": { - # "max_count": 10, - # "cooldown_seconds": 300, - # "target_class": "person", - # }, + "illegal_parking": { + "confirm_vehicle_sec": 15, + "parking_countdown_sec": 300, + "confirm_clear_sec": 30, + "cooldown_sec": 600, + "target_classes": ["car", "truck", "bus", "motorcycle"], + }, + "vehicle_congestion": { + "count_threshold": 3, + "confirm_congestion_sec": 60, + "confirm_clear_sec": 120, + "cooldown_sec": 600, + "target_classes": ["car", "truck", "bus", "motorcycle"], + }, } self._pubsub = None @@ -1090,13 +1587,22 @@ class AlgorithmManager: confirm_seconds=algo_params.get("confirm_seconds", 5), target_class=algo_params.get("target_class"), ) - # elif algorithm_type == "crowd_detection": - # from algorithms import CrowdDetectionAlgorithm - # self.algorithms[roi_id][key]["crowd_detection"] = CrowdDetectionAlgorithm( - # max_count=algo_params.get("max_count", 10), - # cooldown_seconds=algo_params.get("cooldown_seconds", 300), - # target_class=algo_params.get("target_class", "person"), - # ) + elif algorithm_type == "illegal_parking": + self.algorithms[roi_id][key]["illegal_parking"] = IllegalParkingAlgorithm( + confirm_vehicle_sec=algo_params.get("confirm_vehicle_sec", 15), + parking_countdown_sec=algo_params.get("parking_countdown_sec", 300), + confirm_clear_sec=algo_params.get("confirm_clear_sec", 30), + cooldown_sec=algo_params.get("cooldown_sec", 600), + target_classes=algo_params.get("target_classes", ["car", "truck", "bus", "motorcycle"]), + ) + elif algorithm_type == "vehicle_congestion": + self.algorithms[roi_id][key]["vehicle_congestion"] = VehicleCongestionAlgorithm( + count_threshold=algo_params.get("count_threshold", 3), + confirm_congestion_sec=algo_params.get("confirm_congestion_sec", 60), + confirm_clear_sec=algo_params.get("confirm_clear_sec", 120), + cooldown_sec=algo_params.get("cooldown_sec", 600), + target_classes=algo_params.get("target_classes", ["car", "truck", "bus", "motorcycle"]), + ) self._registered_keys.add(cache_key) @@ -1189,6 +1695,8 @@ class AlgorithmManager: "state": getattr(algo, "state", "WAITING"), "alarm_sent": getattr(algo, "alarm_sent", False), } + elif algo_type in ("illegal_parking", "vehicle_congestion"): + status[f"{algo_type}_{bind_id}"] = algo.get_state() else: status[f"{algo_type}_{bind_id}"] = { "detection_count": len(getattr(algo, "detection_start", {})), diff --git a/main.py b/main.py index f7db103..b06c11e 100644 --- a/main.py +++ b/main.py @@ -763,12 +763,16 @@ class EdgeInferenceService: self._camera_alert_cooldown[dedup_key] = now self._performance_stats["total_alerts_generated"] += 1 - # 获取算法的离岗开始时间 - leave_start_time = None - if alert_type == "leave_post": - algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("leave_post") - if algo and hasattr(algo, '_leave_start_time') and algo._leave_start_time: - leave_start_time = algo._leave_start_time.isoformat() + # 获取算法的事件开始时间(泛化:支持所有算法类型) + first_frame_time = None + algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get(alert_type) + if algo: + # 各算法使用不同的内部变量名存储开始时间 + for attr in ('_leave_start_time', '_parking_start_time', '_congestion_start_time', '_intrusion_start_time'): + val = getattr(algo, attr, None) + if val: + first_frame_time = val.isoformat() + break from core.result_reporter import AlarmInfo, generate_alarm_id alarm_info = AlarmInfo( @@ -788,20 +792,16 @@ class EdgeInferenceService: "bind_id": bind.bind_id, "message": alert.get("message", ""), "edge_node_id": self._settings.mqtt.device_id, - "first_frame_time": leave_start_time, + "first_frame_time": first_frame_time, + "vehicle_count": alert.get("vehicle_count"), }, ) self._reporter.report_alarm(alarm_info, screenshot=frame.image) - # 回填 alarm_id 到算法实例(用于后续 resolve 追踪) - if alert_type == "leave_post": - algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("leave_post") - if algo and hasattr(algo, 'set_last_alarm_id'): - algo.set_last_alarm_id(alarm_info.alarm_id) - elif alert_type == "intrusion": - algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("intrusion") - if algo and hasattr(algo, 'set_last_alarm_id'): - algo.set_last_alarm_id(alarm_info.alarm_id) + # 回填 alarm_id 到算法实例(用于后续 resolve 追踪,泛化支持所有算法类型) + algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get(alert_type) + if algo and hasattr(algo, 'set_last_alarm_id'): + algo.set_last_alarm_id(alarm_info.alarm_id) self._logger.info( f"告警已生成: type={alert_type}, " From 0d88ed7fbbcb3e87a159b9c948b5e0af9b3bc323 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Tue, 17 Mar 2026 17:47:12 +0800 Subject: [PATCH 09/28] =?UTF-8?q?=E5=8A=9F=E8=83=BD=EF=BC=9A=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E5=BF=83=E8=B7=B3=E5=AE=88=E6=8A=A4=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=EF=BC=8C=E6=AF=8F30=E7=A7=92=E5=90=91=20WVP=20=E4=B8=8A?= =?UTF-8?q?=E6=8A=A5=E8=AE=BE=E5=A4=87=E7=8A=B6=E6=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 _start_heartbeat() 守护线程 - 每30秒 POST 到 WVP /api/ai/device/heartbeat - 上报 uptime、帧数、告警数、活跃流数、配置版本 - 使用 stop_event.wait(30) 优雅退出 Co-Authored-By: Claude Sonnet 4.6 --- main.py | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/main.py b/main.py index b06c11e..0f06357 100644 --- a/main.py +++ b/main.py @@ -54,6 +54,7 @@ class EdgeInferenceService: self._debug_reload_thread: Optional[threading.Thread] = None self._debug_http_server = None self._debug_http_thread: Optional[threading.Thread] = None + self._heartbeat_thread: Optional[threading.Thread] = None self._processing_threads: Dict[str, threading.Thread] = {} self._stop_event = threading.Event() @@ -284,6 +285,54 @@ class EdgeInferenceService: ) self._debug_http_thread.start() + def _start_heartbeat(self): + """启动心跳守护线程,每 30 秒向 WVP 上报设备状态""" + def worker(): + import requests + base_url = self._settings.alarm_upload.cloud_api_url.rstrip("/") + url = f"{base_url}/api/ai/device/heartbeat" + device_id = self._settings.mqtt.device_id + + self._logger.info(f"[心跳] 守护线程已启动, 目标: {url}, device_id={device_id}") + + while not self._stop_event.is_set(): + try: + start_time = self._performance_stats.get("start_time") + uptime = (datetime.now() - start_time).total_seconds() if start_time else 0 + stream_count = len(self._stream_manager._streams) if self._stream_manager else 0 + config_version = self._config_manager.config_version if self._config_manager else None + + payload = { + "device_id": device_id, + "status": { + "uptime_seconds": int(uptime), + "frames_processed": self._performance_stats.get("total_frames_processed", 0), + "alerts_generated": self._performance_stats.get("total_alerts_generated", 0), + "stream_count": stream_count, + "config_version": config_version, + "stream_stats": { + "active_streams": stream_count, + }, + }, + } + + resp = requests.post(url, json=payload, timeout=10) + if resp.status_code == 200: + self._logger.debug(f"[心跳] 上报成功: uptime={int(uptime)}s, streams={stream_count}") + else: + self._logger.warning(f"[心跳] 上报失败: HTTP {resp.status_code}") + except Exception as e: + self._logger.warning(f"[心跳] 上报异常: {e}") + + self._stop_event.wait(30) + + self._heartbeat_thread = threading.Thread( + target=worker, + name="HeartbeatWorker", + daemon=True, + ) + self._heartbeat_thread.start() + def initialize(self): """初始化所有组件""" self._logger.info("=" * 50) @@ -301,6 +350,7 @@ class EdgeInferenceService: self._init_screenshot_handler() self._start_debug_reload_watcher() self._start_debug_http_server() + self._start_heartbeat() self._performance_stats["start_time"] = datetime.now() From 3d91aa1a67633d4bfd97f5314bf7bdf5fbc099a1 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Wed, 18 Mar 2026 16:05:04 +0800 Subject: [PATCH 10/28] =?UTF-8?q?=E5=8A=9F=E8=83=BD=EF=BC=9Aarea=5Fid=20?= =?UTF-8?q?=E5=85=A8=E9=93=BE=E8=B7=AF=E6=94=AF=E6=8C=81=20+=20=E6=88=AA?= =?UTF-8?q?=E5=9B=BE=E5=A4=84=E7=90=86=E5=99=A8=E7=8B=AC=E7=AB=8B=20Redis?= =?UTF-8?q?=20=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - CameraInfo 模型添加 area_id 字段 - SQLite 表增加 area_id 列及迁移 - config_sync 同步 area_id 到本地 - 告警 ext_data 携带 area_id - 截图处理器使用独立 Redis 连接,避免与配置同步阻塞冲突 - get_all_camera_configs 使用 cursor.description 动态获取列名 Co-Authored-By: Claude Opus 4.6 (1M context) --- config/config_models.py | 7 ++++-- config/database.py | 21 +++++++++++------ core/config_sync.py | 1 + main.py | 52 ++++++++++++++++++++++------------------- 4 files changed, 48 insertions(+), 33 deletions(-) diff --git a/config/config_models.py b/config/config_models.py index 3cec251..90b7f3f 100644 --- a/config/config_models.py +++ b/config/config_models.py @@ -49,7 +49,8 @@ class CameraInfo: enabled: bool = True location: Optional[str] = None extra_params: Optional[Dict[str, Any]] = None - + area_id: Optional[int] = None + def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { @@ -60,8 +61,9 @@ class CameraInfo: "enabled": self.enabled, "location": self.location, "extra_params": self.extra_params, + "area_id": self.area_id, } - + @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'CameraInfo': """从字典创建实例""" @@ -73,6 +75,7 @@ class CameraInfo: enabled=data.get("enabled", True), location=data.get("location"), extra_params=data.get("extra_params"), + area_id=data.get("area_id"), ) diff --git a/config/database.py b/config/database.py index d23be56..912c6bc 100644 --- a/config/database.py +++ b/config/database.py @@ -233,6 +233,7 @@ class SQLiteManager: location TEXT, roi_group_id TEXT, extra_params TEXT, + area_id INTEGER, updated_at TEXT ) """) @@ -250,7 +251,14 @@ class SQLiteManager: """) self._conn.commit() - + + # 迁移:为已有数据库添加 area_id 列 + try: + cursor.execute("ALTER TABLE camera_configs ADD COLUMN area_id INTEGER") + self._conn.commit() + except Exception: + pass # 列已存在,忽略 + self._init_default_algorithms() def _init_default_algorithms(self): @@ -555,8 +563,8 @@ class SQLiteManager: cursor.execute(""" INSERT OR REPLACE INTO camera_configs ( camera_id, rtsp_url, camera_name, status, enabled, - location, roi_group_id, extra_params, updated_at - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + location, roi_group_id, extra_params, area_id, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( camera_id, rtsp_url, kwargs.get('camera_name'), @@ -565,6 +573,7 @@ class SQLiteManager: kwargs.get('location'), kwargs.get('roi_group_id'), str(kwargs.get('extra_params')) if kwargs.get('extra_params') else None, + kwargs.get('area_id'), now )) self._conn.commit() @@ -580,8 +589,7 @@ class SQLiteManager: cursor.execute("SELECT * FROM camera_configs WHERE camera_id = ?", (camera_id,)) row = cursor.fetchone() if row: - columns = ['camera_id', 'rtsp_url', 'camera_name', 'status', - 'enabled', 'location', 'roi_group_id', 'extra_params', 'updated_at'] + columns = [desc[0] for desc in cursor.description] return dict(zip(columns, row)) return None except Exception as e: @@ -593,8 +601,7 @@ class SQLiteManager: try: cursor = self._conn.cursor() cursor.execute("SELECT * FROM camera_configs ORDER BY camera_id") - columns = ['camera_id', 'rtsp_url', 'camera_name', 'status', - 'enabled', 'location', 'roi_group_id', 'extra_params', 'updated_at'] + columns = [desc[0] for desc in cursor.description] return [dict(zip(columns, row)) for row in cursor.fetchall()] except Exception as e: logger.error(f"获取所有摄像头配置失败: {e}") diff --git a/core/config_sync.py b/core/config_sync.py index 75672ae..1ae53e6 100644 --- a/core/config_sync.py +++ b/core/config_sync.py @@ -540,6 +540,7 @@ class ConfigSyncManager: camera_name=cam.get("camera_name", ""), enabled=cam.get("enabled", True), location=cam.get("location", ""), + area_id=cam.get("area_id"), ) count += 1 except Exception as e: diff --git a/main.py b/main.py index 0f06357..2763391 100644 --- a/main.py +++ b/main.py @@ -185,29 +185,27 @@ class EdgeInferenceService: def _init_screenshot_handler(self): """初始化截图处理器""" try: - # 优先从 config_manager 获取已有的云端 Redis 连接 - cloud_redis = getattr(self._config_manager, '_cloud_redis', None) - - # LOCAL 模式下 config_manager 不初始化云端 Redis,需要独立创建 - if cloud_redis is None: - try: - import redis - cfg = self._settings.cloud_redis - cloud_redis = redis.Redis( - host=cfg.host, - port=cfg.port, - db=cfg.db, - password=cfg.password, - decode_responses=cfg.decode_responses, - socket_connect_timeout=5, - socket_timeout=5, - retry_on_timeout=True, - ) - cloud_redis.ping() - self._logger.info(f"截图处理器独立连接云端 Redis 成功: {cfg.host}:{cfg.port}/{cfg.db}") - except Exception as e: - self._logger.warning(f"截图处理器无法连接云端 Redis: {e}") - cloud_redis = None + # 截图处理器必须使用独立的 Redis 连接(不能与 config_sync 共用, + # 因为两者都做阻塞 XREAD/XREADGROUP,共用连接会互相干扰) + cloud_redis = None + try: + import redis + cfg = self._settings.cloud_redis + cloud_redis = redis.Redis( + host=cfg.host, + port=cfg.port, + db=cfg.db, + password=cfg.password, + decode_responses=cfg.decode_responses, + socket_connect_timeout=5, + socket_timeout=10, + retry_on_timeout=True, + ) + cloud_redis.ping() + self._logger.info(f"截图处理器独立连接云端 Redis 成功: {cfg.host}:{cfg.port}/{cfg.db}") + except Exception as e: + self._logger.warning(f"截图处理器无法连接云端 Redis: {e}") + cloud_redis = None if cloud_redis and self._stream_manager: self._screenshot_handler = ScreenshotHandler( @@ -363,7 +361,12 @@ class EdgeInferenceService: ) self._logger.info("所有组件初始化完成") - + + def _get_camera_area_id(self, camera_id: str) -> Optional[int]: + """获取摄像头的 area_id""" + cam = self._get_camera_config_by_id(camera_id) + return cam.area_id if cam else None + def _get_camera_ids_with_roi(self) -> set: """获取有ROI配置的摄像头ID集合 @@ -844,6 +847,7 @@ class EdgeInferenceService: "edge_node_id": self._settings.mqtt.device_id, "first_frame_time": first_frame_time, "vehicle_count": alert.get("vehicle_count"), + "area_id": self._get_camera_area_id(camera_id), }, ) self._reporter.report_alarm(alarm_info, screenshot=frame.image) From 9c39913a55cbf88dff9913a7e4fbd338a274a63f Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Wed, 18 Mar 2026 16:39:58 +0800 Subject: [PATCH 11/28] =?UTF-8?q?=E5=8A=9F=E8=83=BD=EF=BC=9A=E5=91=8A?= =?UTF-8?q?=E8=AD=A6=E7=BA=A7=E5=88=AB=E6=94=AF=E6=8C=81=E5=89=8D=E7=AB=AF?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E4=B8=8B=E5=8F=91=20+=20=E7=BA=A7=E5=88=AB?= =?UTF-8?q?=E4=BD=93=E7=B3=BB=E7=BB=9F=E4=B8=80=E4=B8=BA=200-3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 四种算法均支持通过 params.alarm_level 覆盖默认告警级别 - 级别体系统一:0紧急/1重要/2普通/3轻微 - 车辆拥堵默认阈值调整为 5 辆 Co-Authored-By: Claude Opus 4.6 (1M context) --- algorithms.py | 39 +++++++++++++++++++++++++++++---------- core/result_reporter.py | 2 +- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/algorithms.py b/algorithms.py index 5bb10e7..c7f7ea2 100644 --- a/algorithms.py +++ b/algorithms.py @@ -43,6 +43,9 @@ class LeavePostAlgorithm: STATE_ALARMED = "ALARMED" # 已告警(等待回岗) STATE_NON_WORK_TIME = "NON_WORK_TIME" # 非工作时间 + # 告警级别常量(默认值,可通过 params 覆盖) + DEFAULT_ALARM_LEVEL = 2 # 普通 + def __init__( self, confirm_on_duty_sec: int = 10, # 上岗确认窗口(持续检测到人的时长) @@ -52,6 +55,7 @@ class LeavePostAlgorithm: cooldown_sec: int = 600, # 告警冷却期(两次告警的最小间隔) working_hours: Optional[List[Dict]] = None, target_class: Optional[str] = "person", + alarm_level: Optional[int] = None, # 兼容旧参数名(向后兼容) confirm_leave_sec: Optional[int] = None, ): @@ -65,6 +69,7 @@ class LeavePostAlgorithm: # 工作时间和目标类别 self.working_hours = working_hours or [] self.target_class = target_class + self._alarm_level = alarm_level if alarm_level is not None else self.DEFAULT_ALARM_LEVEL # 状态变量 self.state: str = self.STATE_INIT @@ -295,6 +300,7 @@ class LeavePostAlgorithm: "camera_id": camera_id, "bbox": bbox, "alert_type": "leave_post", + "alarm_level": self._alarm_level, "message": "人员离岗告警", "first_frame_time": self._leave_start_time.strftime('%Y-%m-%d %H:%M:%S'), }) @@ -392,8 +398,8 @@ class IntrusionAlgorithm: STATE_ALARMED = "ALARMED" # 已告警(等待入侵消失) STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR" # 入侵消失确认中 - # 告警级别常量 - ALARM_LEVEL_INTRUSION = 3 + # 告警级别常量(默认值,可通过 params 覆盖) + DEFAULT_ALARM_LEVEL = 1 # 重要 def __init__( self, @@ -402,6 +408,7 @@ class IntrusionAlgorithm: confirm_intrusion_seconds: Optional[int] = None, # 入侵确认时间(默认5秒) confirm_clear_seconds: Optional[int] = None, # 消失确认时间(默认180秒) target_class: Optional[str] = None, + alarm_level: Optional[int] = None, ): self.cooldown_seconds = cooldown_seconds @@ -413,6 +420,7 @@ class IntrusionAlgorithm: self.confirm_seconds = confirm_seconds self.target_class = target_class + self._alarm_level = alarm_level if alarm_level is not None else self.DEFAULT_ALARM_LEVEL # 状态变量 self.state: str = self.STATE_IDLE @@ -516,7 +524,7 @@ class IntrusionAlgorithm: "camera_id": camera_id, "bbox": bbox, "alert_type": "intrusion", - "alarm_level": self.ALARM_LEVEL_INTRUSION, + "alarm_level": self._alarm_level, "message": "检测到周界入侵", "first_frame_time": self._intrusion_start_time.strftime('%Y-%m-%d %H:%M:%S'), }) @@ -729,8 +737,8 @@ class IllegalParkingAlgorithm: STATE_ALARMED = "ALARMED" STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR" - # 告警级别常量 - ALARM_LEVEL_ILLEGAL_PARKING = 2 # 一般级别 + # 告警级别常量(默认值,可通过 params 覆盖) + DEFAULT_ALARM_LEVEL = 1 # 重要 # 滑动窗口参数 WINDOW_SIZE_SEC = 10 @@ -742,12 +750,14 @@ class IllegalParkingAlgorithm: confirm_clear_sec: int = 30, cooldown_sec: int = 600, target_classes: Optional[List[str]] = None, + alarm_level: Optional[int] = None, ): self.confirm_vehicle_sec = confirm_vehicle_sec self.parking_countdown_sec = parking_countdown_sec self.confirm_clear_sec = confirm_clear_sec self.cooldown_sec = cooldown_sec self.target_classes = target_classes or ["car", "truck", "bus", "motorcycle"] + self._alarm_level = alarm_level if alarm_level is not None else self.DEFAULT_ALARM_LEVEL # 状态变量 self.state: str = self.STATE_IDLE @@ -878,7 +888,7 @@ class IllegalParkingAlgorithm: "camera_id": camera_id, "bbox": bbox, "alert_type": "illegal_parking", - "alarm_level": self.ALARM_LEVEL_ILLEGAL_PARKING, + "alarm_level": self._alarm_level, "confidence": confidence, "message": f"检测到车辆违停(已停留{int(elapsed / 60)}分钟)", "first_frame_time": self._parking_start_time.strftime('%Y-%m-%d %H:%M:%S') if self._parking_start_time else None, @@ -982,25 +992,27 @@ class VehicleCongestionAlgorithm: STATE_CONGESTED = "CONGESTED" STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR" - # 告警级别 - ALARM_LEVEL_CONGESTION = 2 # 一般级别 + # 告警级别(默认值,可通过 params 覆盖) + DEFAULT_ALARM_LEVEL = 2 # 普通 # 滑动窗口参数 WINDOW_SIZE_SEC = 10 def __init__( self, - count_threshold: int = 3, + count_threshold: int = 5, confirm_congestion_sec: int = 60, confirm_clear_sec: int = 120, cooldown_sec: int = 600, target_classes: Optional[List[str]] = None, + alarm_level: Optional[int] = None, ): self.count_threshold = count_threshold self.confirm_congestion_sec = confirm_congestion_sec self.confirm_clear_sec = confirm_clear_sec self.cooldown_sec = cooldown_sec self.target_classes = target_classes or ["car", "truck", "bus", "motorcycle"] + self._alarm_level = alarm_level if alarm_level is not None else self.DEFAULT_ALARM_LEVEL # 状态变量 self.state: str = self.STATE_NORMAL @@ -1112,7 +1124,7 @@ class VehicleCongestionAlgorithm: "camera_id": camera_id, "bbox": bbox, "alert_type": "vehicle_congestion", - "alarm_level": self.ALARM_LEVEL_CONGESTION, + "alarm_level": self._alarm_level, "confidence": confidence, "message": f"检测到车辆拥堵(平均{avg_count:.0f}辆,持续{int(elapsed)}秒)", "first_frame_time": self._congestion_start_time.strftime('%Y-%m-%d %H:%M:%S') if self._congestion_start_time else None, @@ -1571,6 +1583,9 @@ class AlgorithmManager: if params: algo_params.update(params) + # 从 params 中提取告警等级(前端配置下发) + configured_alarm_level = algo_params.get("alarm_level") + if algorithm_type == "leave_post": roi_working_hours = algo_params.get("working_hours") or self.working_hours self.algorithms[roi_id][key]["leave_post"] = LeavePostAlgorithm( @@ -1580,12 +1595,14 @@ class AlgorithmManager: cooldown_sec=algo_params.get("cooldown_sec", 600), working_hours=roi_working_hours, target_class=algo_params.get("target_class", "person"), + alarm_level=configured_alarm_level, ) elif algorithm_type == "intrusion": self.algorithms[roi_id][key]["intrusion"] = IntrusionAlgorithm( cooldown_seconds=algo_params.get("cooldown_seconds", 300), confirm_seconds=algo_params.get("confirm_seconds", 5), target_class=algo_params.get("target_class"), + alarm_level=configured_alarm_level, ) elif algorithm_type == "illegal_parking": self.algorithms[roi_id][key]["illegal_parking"] = IllegalParkingAlgorithm( @@ -1594,6 +1611,7 @@ class AlgorithmManager: confirm_clear_sec=algo_params.get("confirm_clear_sec", 30), cooldown_sec=algo_params.get("cooldown_sec", 600), target_classes=algo_params.get("target_classes", ["car", "truck", "bus", "motorcycle"]), + alarm_level=configured_alarm_level, ) elif algorithm_type == "vehicle_congestion": self.algorithms[roi_id][key]["vehicle_congestion"] = VehicleCongestionAlgorithm( @@ -1602,6 +1620,7 @@ class AlgorithmManager: confirm_clear_sec=algo_params.get("confirm_clear_sec", 120), cooldown_sec=algo_params.get("cooldown_sec", 600), target_classes=algo_params.get("target_classes", ["car", "truck", "bus", "motorcycle"]), + alarm_level=configured_alarm_level, ) self._registered_keys.add(cache_key) diff --git a/core/result_reporter.py b/core/result_reporter.py index 9cca97e..816dd07 100644 --- a/core/result_reporter.py +++ b/core/result_reporter.py @@ -29,7 +29,7 @@ class AlarmInfo: device_id: str scene_id: str event_time: str # ISO8601 - alarm_level: int # 1-4 + alarm_level: int # 0紧急 1重要 2普通 3轻微 snapshot_b64: Optional[str] = None # Base64 编码的 JPEG 截图 algorithm_code: Optional[str] = None confidence_score: Optional[float] = None From 2ea35ad5d34ea2d3d69b0b42f9e8a6b02dabb1fd Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Thu, 19 Mar 2026 09:58:24 +0800 Subject: [PATCH 12/28] =?UTF-8?q?=E5=8A=9F=E8=83=BD=EF=BC=9A=E5=BF=83?= =?UTF-8?q?=E8=B7=B3=E5=90=8C=E6=97=B6=E5=8F=91=E9=80=81=E5=88=B0=20vsp-se?= =?UTF-8?q?rvice=20=E5=92=8C=20WVP=20=E5=B9=B3=E5=8F=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增 WVP_API_URL 环境变量,心跳同时上报到两个地址, 解决前端从 WVP 读设备状态显示离线的问题。 --- config/settings.py | 2 ++ main.py | 23 +++++++++++++++-------- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/config/settings.py b/config/settings.py index 7fc58af..d31ae3e 100644 --- a/config/settings.py +++ b/config/settings.py @@ -93,6 +93,7 @@ class COSConfig: class AlarmUploadConfig: """告警上报配置""" cloud_api_url: str = "http://localhost:8000" + wvp_api_url: str = "" # WVP 平台地址(心跳同步用) edge_token: str = "" retry_max: int = 3 retry_interval: int = 5 @@ -263,6 +264,7 @@ class Settings: self.alarm_upload = AlarmUploadConfig( cloud_api_url=os.getenv("CLOUD_API_URL", "http://localhost:8000"), + wvp_api_url=os.getenv("WVP_API_URL", ""), edge_token=os.getenv("EDGE_TOKEN", ""), retry_max=int(os.getenv("ALARM_RETRY_MAX", "3")), retry_interval=int(os.getenv("ALARM_RETRY_INTERVAL", "5")), diff --git a/main.py b/main.py index 2763391..975813d 100644 --- a/main.py +++ b/main.py @@ -284,14 +284,17 @@ class EdgeInferenceService: self._debug_http_thread.start() def _start_heartbeat(self): - """启动心跳守护线程,每 30 秒向 WVP 上报设备状态""" + """启动心跳守护线程,每 30 秒向云端上报设备状态""" def worker(): import requests base_url = self._settings.alarm_upload.cloud_api_url.rstrip("/") - url = f"{base_url}/api/ai/device/heartbeat" + wvp_url = self._settings.alarm_upload.wvp_api_url.rstrip("/") if self._settings.alarm_upload.wvp_api_url else "" + urls = [f"{base_url}/api/ai/device/heartbeat"] + if wvp_url: + urls.append(f"{wvp_url}/api/ai/device/heartbeat") device_id = self._settings.mqtt.device_id - self._logger.info(f"[心跳] 守护线程已启动, 目标: {url}, device_id={device_id}") + self._logger.info(f"[心跳] 守护线程已启动, 目标: {urls}, device_id={device_id}") while not self._stop_event.is_set(): try: @@ -314,11 +317,15 @@ class EdgeInferenceService: }, } - resp = requests.post(url, json=payload, timeout=10) - if resp.status_code == 200: - self._logger.debug(f"[心跳] 上报成功: uptime={int(uptime)}s, streams={stream_count}") - else: - self._logger.warning(f"[心跳] 上报失败: HTTP {resp.status_code}") + for url in urls: + try: + resp = requests.post(url, json=payload, timeout=10) + if resp.status_code == 200: + self._logger.debug(f"[心跳] {url} 上报成功") + else: + self._logger.warning(f"[心跳] {url} 上报失败: HTTP {resp.status_code}") + except Exception as e: + self._logger.warning(f"[心跳] {url} 上报异常: {e}") except Exception as e: self._logger.warning(f"[心跳] 上报异常: {e}") From 13706bc55cc1c932d2844d8bf320f047921759a6 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Thu, 19 Mar 2026 10:30:51 +0800 Subject: [PATCH 13/28] =?UTF-8?q?=E5=AE=89=E5=85=A8=EF=BC=9A=E7=A7=BB?= =?UTF-8?q?=E9=99=A4=20.env=20=E8=B7=9F=E8=B8=AA=EF=BC=8C=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=20.env.example=20=E6=A8=A1=E6=9D=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit .env 含真实密钥不应入库,已在 .gitignore 中忽略。 新增 .env.example 作为配置模板(占位符)。 --- .env | 23 ----------------------- .env.example | 25 +++++++++++++++++++++++++ 2 files changed, 25 insertions(+), 23 deletions(-) delete mode 100644 .env create mode 100644 .env.example diff --git a/.env b/.env deleted file mode 100644 index 5717f51..0000000 --- a/.env +++ /dev/null @@ -1,23 +0,0 @@ -# Local debug overrides -DEBUG_SERVER_HOST=0.0.0.0 - -# Alarm upload - cloud API URL (WVP backend) -CLOUD_API_URL=http://124.221.55.225:18080 - -# ===== 边缘设备 ID(必须与 WVP 数据库 ROI 表的 device_id 一致) ===== -EDGE_DEVICE_ID=edge - -# ===== 配置同步模式(REDIS=监听云端Stream, LOCAL=仅本地SQLite) ===== -CONFIG_SYNC_MODE=REDIS - -# ===== 云端 Redis(截图处理器 + 配置同步,db=1) ===== -CLOUD_REDIS_HOST=sh-crs-6upea3zn.sql.tencentcdb.com -CLOUD_REDIS_PORT=24637 -CLOUD_REDIS_DB=1 -CLOUD_REDIS_PASSWORD=HkVZkVnn1 - -# ===== 腾讯云 COS(截图上传) ===== -COS_REGION=ap-shanghai -COS_BUCKET=xhwkzx-1-1389966313 -COS_SECRET_ID=AKIDVxPiqmVhYv7FCwVqytdAVddQ2TJySt9I -COS_SECRET_KEY=1rVyEI8mMVWs21xfBUjy4BE6DA4z7KWb diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..6b53aca --- /dev/null +++ b/.env.example @@ -0,0 +1,25 @@ +# ===== 云端 API ===== +CLOUD_API_URL=http://your-server:8000 +WVP_API_URL=http://your-server:18080 + +# ===== 边缘设备 ID(必须与 WVP 数据库 ROI 表的 device_id 一致) ===== +EDGE_DEVICE_ID=edge + +# ===== 配置同步模式(REDIS=监听云端Stream, LOCAL=仅本地SQLite) ===== +CONFIG_SYNC_MODE=REDIS + +# ===== 云端 Redis(截图处理器 + 配置同步) ===== +CLOUD_REDIS_HOST=your-redis-host +CLOUD_REDIS_PORT=6379 +CLOUD_REDIS_DB=1 +CLOUD_REDIS_PASSWORD=your-redis-password + +# ===== 腾讯云 COS(截图上传) ===== +COS_REGION=ap-shanghai +COS_BUCKET=your-bucket-name +COS_SECRET_ID=your-cos-secret-id +COS_SECRET_KEY=your-cos-secret-key + +# ===== 按算法独立置信度阈值 ===== +ALGO_CONF_LEAVE_POST=0.4 +ALGO_CONF_INTRUSION=0.6 From 8da4ef9e932156128615a221d56dd8f2bae1c783 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Thu, 19 Mar 2026 10:56:28 +0800 Subject: [PATCH 14/28] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=EF=BC=9ARedis=20?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E5=8A=A0=E8=BD=BD=E6=97=B6=E4=BC=A0=E5=85=A5?= =?UTF-8?q?=20alarm=5Flevel=20=E5=88=B0=E7=AE=97=E6=B3=95=E5=AE=9E?= =?UTF-8?q?=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Redis 订阅路径初始化算法时漏传 alarm_level 参数, 导致前端配置的告警等级不生效,始终使用默认值。 --- algorithms.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/algorithms.py b/algorithms.py index c7f7ea2..1b39fbc 100644 --- a/algorithms.py +++ b/algorithms.py @@ -1347,6 +1347,7 @@ class AlgorithmManager: key = f"{roi_id}_{bind_id}" if algo_code == "leave_post": + configured_alarm_level = params.get("alarm_level") algo_params = { "confirm_on_duty_sec": params.get("confirm_on_duty_sec", 10), "confirm_leave_sec": params.get("confirm_leave_sec", 30), @@ -1360,6 +1361,8 @@ class AlgorithmManager: algo.confirm_leave_sec = algo_params["confirm_leave_sec"] algo.cooldown_sec = algo_params["cooldown_sec"] algo.target_class = algo_params["target_class"] + if configured_alarm_level is not None: + algo._alarm_level = configured_alarm_level if algo_params["working_hours"]: algo.working_hours = algo_params["working_hours"] logger.info(f"已热更新算法参数: {key}") @@ -1368,13 +1371,15 @@ class AlgorithmManager: self.algorithms[roi_id][key]["leave_post"] = LeavePostAlgorithm( confirm_on_duty_sec=algo_params["confirm_on_duty_sec"], confirm_leave_sec=algo_params["confirm_leave_sec"], - leave_countdown_sec=algo_params.get("leave_countdown_sec", 300), # 离岗倒计时,默认5分钟 + leave_countdown_sec=algo_params.get("leave_countdown_sec", 300), cooldown_sec=algo_params["cooldown_sec"], working_hours=algo_params["working_hours"], target_class=algo_params["target_class"], + alarm_level=configured_alarm_level, ) logger.info(f"已从Redis加载算法: {key}") elif algo_code == "intrusion": + configured_alarm_level = params.get("alarm_level") algo_params = { "cooldown_seconds": params.get("cooldown_seconds", 300), "confirm_seconds": params.get("confirm_seconds", 5), @@ -1385,6 +1390,7 @@ class AlgorithmManager: cooldown_seconds=algo_params["cooldown_seconds"], confirm_seconds=algo_params["confirm_seconds"], target_class=algo_params["target_class"], + alarm_level=configured_alarm_level, ) logger.info(f"已从Redis加载算法: {key}") From 648606fd0d7bf11f982d6c9a44c81bb188c46ffe Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Thu, 19 Mar 2026 15:19:27 +0800 Subject: [PATCH 15/28] =?UTF-8?q?=E4=BC=98=E5=8C=96=EF=BC=9A=E8=BD=A6?= =?UTF-8?q?=E8=BE=86=E8=BF=9D=E5=81=9C=E5=92=8C=E6=8B=A5=E5=A0=B5=E7=AE=97?= =?UTF-8?q?=E6=B3=95=E9=98=B2=E9=A2=91=E7=B9=81=E5=91=8A=E8=AD=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 违停: - confirm_clear 30→120秒(持续2分钟无车才确认离开) - cooldown 600→1800秒(30分钟冷却) - ALARMED 清除阈值 ratio<0.3→ratio<0.15(更严格) 拥堵: - confirm_clear 120→180秒 - cooldown 600→1800秒 - 消散阈值从 < threshold 改为 < threshold*0.5(降到一半才开始确认) --- algorithms.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/algorithms.py b/algorithms.py index 1b39fbc..69c6e83 100644 --- a/algorithms.py +++ b/algorithms.py @@ -747,8 +747,8 @@ class IllegalParkingAlgorithm: self, confirm_vehicle_sec: int = 15, parking_countdown_sec: int = 300, - confirm_clear_sec: int = 30, - cooldown_sec: int = 600, + confirm_clear_sec: int = 120, + cooldown_sec: int = 1800, target_classes: Optional[List[str]] = None, alarm_level: Optional[int] = None, ): @@ -905,10 +905,10 @@ class IllegalParkingAlgorithm: logger.debug(f"ROI {roi_id}: PARKED_COUNTDOWN → IDLE (冷却期内)") elif self.state == self.STATE_ALARMED: - if ratio < 0.3: + if ratio < 0.15: self.state = self.STATE_CONFIRMING_CLEAR self.state_start_time = current_time - logger.debug(f"ROI {roi_id}: ALARMED → CONFIRMING_CLEAR") + logger.debug(f"ROI {roi_id}: ALARMED → CONFIRMING_CLEAR (ratio={ratio:.2f}<0.15)") elif self.state == self.STATE_CONFIRMING_CLEAR: if self.state_start_time is None: @@ -1002,8 +1002,8 @@ class VehicleCongestionAlgorithm: self, count_threshold: int = 5, confirm_congestion_sec: int = 60, - confirm_clear_sec: int = 120, - cooldown_sec: int = 600, + confirm_clear_sec: int = 180, + cooldown_sec: int = 1800, target_classes: Optional[List[str]] = None, alarm_level: Optional[int] = None, ): @@ -1140,10 +1140,11 @@ class VehicleCongestionAlgorithm: logger.debug(f"ROI {roi_id}: CONFIRMING_CONGESTION → NORMAL (冷却期内)") elif self.state == self.STATE_CONGESTED: - if avg_count < self.count_threshold: + # 车辆数降到阈值的一半以下才开始确认消散(避免抖动) + if avg_count < self.count_threshold * 0.5: self.state = self.STATE_CONFIRMING_CLEAR self.state_start_time = current_time - logger.debug(f"ROI {roi_id}: CONGESTED → CONFIRMING_CLEAR (avg={avg_count:.1f}<{self.count_threshold})") + logger.debug(f"ROI {roi_id}: CONGESTED → CONFIRMING_CLEAR (avg={avg_count:.1f}<{self.count_threshold * 0.5:.1f})") elif self.state == self.STATE_CONFIRMING_CLEAR: if self.state_start_time is None: From 369bb0239116b510e65121cd1c8634bcd35bf025 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Thu, 19 Mar 2026 15:47:01 +0800 Subject: [PATCH 16/28] =?UTF-8?q?=E4=BC=98=E5=8C=96=EF=BC=9A=E8=BF=9D?= =?UTF-8?q?=E5=81=9C=E5=92=8C=E6=8B=A5=E5=A0=B5=20resolve=20=E5=90=8E?= =?UTF-8?q?=E6=B8=85=E9=99=A4=E5=86=B7=E5=8D=B4=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 车辆离开/拥堵消散确认后清冷却,新的违停/拥堵事件可正常触发。 15秒确认期+命中率阈值已能过滤路过车辆,不会误触发。 --- algorithms.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/algorithms.py b/algorithms.py index 69c6e83..770ebfe 100644 --- a/algorithms.py +++ b/algorithms.py @@ -939,6 +939,7 @@ class IllegalParkingAlgorithm: self.state_start_time = None self._last_alarm_id = None self._parking_start_time = None + self.alert_cooldowns.clear() # 车辆离开后清冷却,新车违停可正常告警 logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → IDLE") return alerts @@ -1175,6 +1176,7 @@ class VehicleCongestionAlgorithm: self.state_start_time = None self._last_alarm_id = None self._congestion_start_time = None + self.alert_cooldowns.clear() # 拥堵消散后清冷却,再次拥堵可正常告警 logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → NORMAL") return alerts From d5c443c7c62027c0607f9ffab91b06839482a1cd Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Fri, 20 Mar 2026 10:03:41 +0800 Subject: [PATCH 17/28] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=EF=BC=9A=E4=BA=91?= =?UTF-8?q?=E7=AB=AF=20Redis=20=E8=BF=9E=E6=8E=A5=E5=8A=A0=20keepalive=20+?= =?UTF-8?q?=20=E5=81=A5=E5=BA=B7=E6=A3=80=E6=9F=A5=EF=BC=8C=E9=98=B2?= =?UTF-8?q?=E6=AD=A2=E9=A2=91=E7=B9=81=E6=96=AD=E8=BF=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 腾讯云 Redis 空闲连接超时会主动断开,加 socket_keepalive=True 和 health_check_interval=30 保活,减少重连频率。 --- core/config_sync.py | 2 ++ main.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/core/config_sync.py b/core/config_sync.py index 1ae53e6..233c037 100644 --- a/core/config_sync.py +++ b/core/config_sync.py @@ -206,6 +206,8 @@ class ConfigSyncManager: socket_connect_timeout=10, socket_timeout=10, retry_on_timeout=True, + socket_keepalive=True, + health_check_interval=30, ) self._cloud_redis.ping() logger.info(f"云端 Redis 连接成功: {cfg.host}:{cfg.port}/{cfg.db}") diff --git a/main.py b/main.py index 975813d..ea63cb5 100644 --- a/main.py +++ b/main.py @@ -200,6 +200,8 @@ class EdgeInferenceService: socket_connect_timeout=5, socket_timeout=10, retry_on_timeout=True, + socket_keepalive=True, + health_check_interval=30, ) cloud_redis.ping() self._logger.info(f"截图处理器独立连接云端 Redis 成功: {cfg.host}:{cfg.port}/{cfg.db}") From b70f8cd680b4fb36dd517c82b5362338c8742464 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Fri, 20 Mar 2026 11:19:31 +0800 Subject: [PATCH 18/28] =?UTF-8?q?=E4=BC=98=E5=8C=96=EF=BC=9ARedis=20?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E5=A2=9E=E5=BC=BA=20=E2=80=94=20TCP=20keepal?= =?UTF-8?q?ive=20=E9=80=82=E9=85=8D=20+=20=E6=88=AA=E5=9B=BE=E5=A4=84?= =?UTF-8?q?=E7=90=86=E5=99=A8=E7=8B=AC=E7=AB=8B=E9=87=8D=E8=BF=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 _build_keepalive_options() 适配 Linux/Windows TCP keepalive - health_check_interval 30→15秒,更快发现断连 - 截图处理器新增 _reconnect_cloud_redis() 独立重连能力 - 截图监听捕获 ConnectionError 主动重连,不再退避到60秒 --- core/config_sync.py | 22 +++++++++++++++++++++- core/screenshot_handler.py | 35 +++++++++++++++++++++++++++++++++++ main.py | 4 +++- 3 files changed, 59 insertions(+), 2 deletions(-) diff --git a/core/config_sync.py b/core/config_sync.py index 233c037..f5f106f 100644 --- a/core/config_sync.py +++ b/core/config_sync.py @@ -14,6 +14,8 @@ import json import logging import os +import platform +import socket import threading import time from datetime import datetime @@ -30,6 +32,23 @@ from utils.version_control import get_version_control logger = logging.getLogger(__name__) +def _build_keepalive_options(): + """构建 TCP keepalive 选项,适配 Linux/Windows""" + opts = {} + if platform.system() == "Linux": + # TCP_KEEPIDLE: 连接空闲 15s 后开始发送 keepalive 探测 + # TCP_KEEPINTVL: 每次探测间隔 5s + # TCP_KEEPCNT: 连续 3 次探测失败则判定断连 + opts = { + socket.TCP_KEEPIDLE: 15, + socket.TCP_KEEPINTVL: 5, + socket.TCP_KEEPCNT: 3, + } + # Windows 不支持 TCP_KEEPIDLE/KEEPINTVL/KEEPCNT, + # 但 socket_keepalive=True 仍会启用默认 keepalive + return opts + + # ==================== Redis Key 常量 ==================== # 云端 Redis Keys @@ -207,7 +226,8 @@ class ConfigSyncManager: socket_timeout=10, retry_on_timeout=True, socket_keepalive=True, - health_check_interval=30, + socket_keepalive_options=_build_keepalive_options(), + health_check_interval=15, ) self._cloud_redis.ping() logger.info(f"云端 Redis 连接成功: {cfg.host}:{cfg.port}/{cfg.db}") diff --git a/core/screenshot_handler.py b/core/screenshot_handler.py index ce503bc..f430dfe 100644 --- a/core/screenshot_handler.py +++ b/core/screenshot_handler.py @@ -16,6 +16,7 @@ from typing import Optional import cv2 import numpy as np +import redis import requests from config.settings import get_settings, COSConfig @@ -121,6 +122,32 @@ class ScreenshotHandler: else: logger.error("[截图] 创建 consumer group 失败: %s", e) + # ==================== 重连 ==================== + + def _reconnect_cloud_redis(self): + """重建云端 Redis 连接""" + try: + from core.config_sync import _build_keepalive_options + cfg = self._settings.cloud_redis + self._cloud_redis = redis.Redis( + host=cfg.host, + port=cfg.port, + db=cfg.db, + password=cfg.password, + decode_responses=cfg.decode_responses, + socket_connect_timeout=5, + socket_timeout=10, + retry_on_timeout=True, + socket_keepalive=True, + socket_keepalive_options=_build_keepalive_options(), + health_check_interval=15, + ) + self._cloud_redis.ping() + logger.info("[截图] 云端 Redis 重连成功") + except Exception as e: + logger.warning("[截图] 云端 Redis 重连失败: %s", e) + self._cloud_redis = None + # ==================== 主循环 ==================== def _listen_loop(self): @@ -157,6 +184,14 @@ class ScreenshotHandler: except Exception: pass + except redis.ConnectionError as e: + if self._stop_event.is_set(): + return + logger.warning("[截图] 云端 Redis 连接断开: %s, %ds 后重连...", e, backoff) + self._reconnect_cloud_redis() + self._stop_event.wait(backoff) + backoff = min(backoff * 2, max_backoff) + except Exception as e: if self._stop_event.is_set(): return diff --git a/main.py b/main.py index ea63cb5..0671685 100644 --- a/main.py +++ b/main.py @@ -191,6 +191,7 @@ class EdgeInferenceService: try: import redis cfg = self._settings.cloud_redis + from core.config_sync import _build_keepalive_options cloud_redis = redis.Redis( host=cfg.host, port=cfg.port, @@ -201,7 +202,8 @@ class EdgeInferenceService: socket_timeout=10, retry_on_timeout=True, socket_keepalive=True, - health_check_interval=30, + socket_keepalive_options=_build_keepalive_options(), + health_check_interval=15, ) cloud_redis.ping() self._logger.info(f"截图处理器独立连接云端 Redis 成功: {cfg.host}:{cfg.port}/{cfg.db}") From 56bf454135126c432c103186c91455ea923bafa8 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Fri, 20 Mar 2026 15:07:05 +0800 Subject: [PATCH 19/28] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=EF=BC=9AXREAD=20?= =?UTF-8?q?=E7=A9=BA=E9=97=B2=E6=97=B6=E4=B8=BB=E5=8A=A8=20PING=20?= =?UTF-8?q?=E4=BF=9D=E6=B4=BB=EF=BC=8C=E8=A7=A3=E5=86=B3=20Windows=20?= =?UTF-8?q?=E4=B8=8B=E4=BA=91=E7=AB=AF=20Redis=20=E9=A2=91=E7=B9=81?= =?UTF-8?q?=E6=96=AD=E8=BF=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Windows 不支持 TCP_KEEPIDLE 自定义参数,TCP keepalive 无法阻止 腾讯云 Redis 的空闲超时断连。改用应用层 PING 保活: 每次 XREAD 超时(5秒)返回空时发一次 PING,确保连接活跃。 --- core/config_sync.py | 5 +++++ core/screenshot_handler.py | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/core/config_sync.py b/core/config_sync.py index f5f106f..e20d02d 100644 --- a/core/config_sync.py +++ b/core/config_sync.py @@ -348,6 +348,11 @@ class ConfigSyncManager: ) if not result: + # 无新消息时主动 PING 保活,防止云端 Redis 断开空闲连接 + try: + self._cloud_redis.ping() + except Exception: + break # ping 失败说明连接已断,跳出内层循环触发重连 continue for stream_name, messages in result: diff --git a/core/screenshot_handler.py b/core/screenshot_handler.py index f430dfe..5e0d303 100644 --- a/core/screenshot_handler.py +++ b/core/screenshot_handler.py @@ -165,6 +165,11 @@ class ScreenshotHandler: block=5000, ) if not results: + # 无新消息时主动 PING 保活 + try: + self._cloud_redis.ping() + except Exception: + raise redis.ConnectionError("ping failed") continue backoff = 5 # 重置退避 From 5049f7b26b5e3f8a447b0f9336d5622c818968e0 Mon Sep 17 00:00:00 2001 From: YangCheng <1633794139@qq.com> Date: Fri, 20 Mar 2026 22:20:20 +0800 Subject: [PATCH 20/28] =?UTF-8?q?=E5=8A=9F=E8=83=BD=EF=BC=9A=E6=88=AA?= =?UTF-8?q?=E5=9B=BE=E5=A4=84=E7=90=86=E5=99=A8=E6=8C=89device=5Fid?= =?UTF-8?q?=E8=BF=87=E6=BB=A4=E8=AF=B7=E6=B1=82=EF=BC=8Cconsumer=20name?= =?UTF-8?q?=E5=8A=A0=E8=AE=BE=E5=A4=87=E6=A0=87=E8=AF=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env | 32 ++++++++++++++++++++++++++++++++ core/screenshot_handler.py | 16 +++++++++++++--- 2 files changed, 45 insertions(+), 3 deletions(-) create mode 100644 .env diff --git a/.env b/.env new file mode 100644 index 0000000..a4b9946 --- /dev/null +++ b/.env @@ -0,0 +1,32 @@ +# Local debug overrides +DEBUG_SERVER_HOST=0.0.0.0 + +# Alarm upload - cloud API URL (WVP backend) +CLOUD_API_URL=http://124.221.55.225:18080 + +# ===== 边缘设备 ID(第二台 Edge 节点,必须与 WVP 数据库 ROI 表的 device_id 一致) ===== +EDGE_DEVICE_ID=edge_002 + +# ===== 配置同步模式(REDIS=监听云端Stream, LOCAL=仅本地SQLite) ===== +CONFIG_SYNC_MODE=REDIS + +# ===== 云端 Redis(截图处理器 + 配置同步,db=1) ===== +CLOUD_REDIS_HOST=sh-crs-6upea3zn.sql.tencentcdb.com +CLOUD_REDIS_PORT=24637 +CLOUD_REDIS_DB=1 +CLOUD_REDIS_PASSWORD=HkVZkVnn1 + +# ===== 腾讯云 COS(截图上传) ===== +COS_REGION=ap-shanghai +COS_BUCKET=xhwkzx-1-1389966313 +COS_SECRET_ID=AKIDVxPiqmVhYv7FCwVqytdAVddQ2TJySt9I +COS_SECRET_KEY=1rVyEI8mMVWs21xfBUjy4BE6DA4z7KWb + +# ===== 本地 Redis(告警队列、配置缓存) ===== +LOCAL_REDIS_HOST=localhost +LOCAL_REDIS_PORT=6379 +LOCAL_REDIS_DB=0 + +# ===== 推理配置(GTX 1050 Ti 2GB 显存优化) ===== +BATCH_SIZE=1 +VIDEO_DEFAULT_FPS=3 diff --git a/core/screenshot_handler.py b/core/screenshot_handler.py index 5e0d303..8eb033a 100644 --- a/core/screenshot_handler.py +++ b/core/screenshot_handler.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) # Redis 常量 SNAP_REQUEST_STREAM = "edge_snap_request" SNAP_CONSUMER_GROUP = "edge_snap_group" -SNAP_CONSUMER_NAME = "edge_snap_worker" +SNAP_CONSUMER_NAME_PREFIX = "edge_snap_worker" SNAP_RESULT_KEY_PREFIX = "snap:result:" SNAP_RESULT_TTL = 60 # 降级结果 key 60s 过期 @@ -54,6 +54,8 @@ class ScreenshotHandler: self._settings = get_settings() self._cos_client = None self._cos_config: COSConfig = self._settings.cos + self._device_id = self._settings.mqtt.device_id + self._consumer_name = f"{SNAP_CONSUMER_NAME_PREFIX}_{self._device_id}" self._thread: Optional[threading.Thread] = None self._stop_event = threading.Event() @@ -79,7 +81,8 @@ class ScreenshotHandler: daemon=True, ) self._thread.start() - logger.info("[截图] 截图处理器已启动") + logger.info("[截图] 截图处理器已启动, consumer=%s, device_id=%s", + self._consumer_name, self._device_id) def stop(self): """停止截图监听线程""" @@ -159,7 +162,7 @@ class ScreenshotHandler: try: results = self._cloud_redis.xreadgroup( SNAP_CONSUMER_GROUP, - SNAP_CONSUMER_NAME, + self._consumer_name, {SNAP_REQUEST_STREAM: ">"}, count=1, block=5000, @@ -221,6 +224,13 @@ class ScreenshotHandler: }) return + # 多 Edge 设备隔离:如果消息指定了 device_id 且不是本设备,跳过 + target_device_id = fields.get("device_id", "") + if target_device_id and target_device_id != self._device_id: + logger.debug("[截图] 跳过非本设备请求: target=%s, self=%s, request_id=%s", + target_device_id, self._device_id, request_id) + return + rtsp_url = fields.get("rtsp_url", "") logger.info("[截图] 收到截图请求: request_id=%s, camera=%s, callback=%s", From d972f6e5fe0cbbda2013b0b9f61a00525d2b9f24 Mon Sep 17 00:00:00 2001 From: YangCheng <1633794139@qq.com> Date: Fri, 20 Mar 2026 22:24:19 +0800 Subject: [PATCH 21/28] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=EF=BC=9A=E6=89=80?= =?UTF-8?q?=E6=9C=89HTTP=E8=AF=B7=E6=B1=82=E7=A6=81=E7=94=A8=E7=B3=BB?= =?UTF-8?q?=E7=BB=9F=E4=BB=A3=E7=90=86=EF=BC=8C=E8=A7=A3=E5=86=B3=E6=9C=AC?= =?UTF-8?q?=E6=9C=BAClash=E4=BB=A3=E7=90=86=E5=AF=BC=E8=87=B4=E5=BF=83?= =?UTF-8?q?=E8=B7=B3=E5=92=8C=E5=91=8A=E8=AD=A6=E4=B8=8A=E6=8A=A5=E8=B6=85?= =?UTF-8?q?=E6=97=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/alarm_upload_worker.py | 6 ++++-- main.py | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/alarm_upload_worker.py b/core/alarm_upload_worker.py index c473a96..0af1bd6 100644 --- a/core/alarm_upload_worker.py +++ b/core/alarm_upload_worker.py @@ -107,7 +107,7 @@ class AlarmUploadWorker: self._logger.info(f"告警上报端点: {report_url}") try: - resp = requests.get(health_url, timeout=5) + resp = requests.get(health_url, timeout=5, proxies={"http": None, "https": None}) if resp.status_code == 200: self._logger.info(f"云端健康检查通过: {health_url}") else: @@ -247,7 +247,8 @@ class AlarmUploadWorker: } try: - response = requests.post(url, json=payload, headers=headers, timeout=10) + response = requests.post(url, json=payload, headers=headers, timeout=10, + proxies={"http": None, "https": None}) if response.status_code == 200: body = response.json() if body.get("code") == 0: @@ -346,6 +347,7 @@ class AlarmUploadWorker: json=report_data, headers=headers, timeout=10, + proxies={"http": None, "https": None}, ) if response.status_code == 200: diff --git a/main.py b/main.py index 0671685..c4dd923 100644 --- a/main.py +++ b/main.py @@ -323,7 +323,8 @@ class EdgeInferenceService: for url in urls: try: - resp = requests.post(url, json=payload, timeout=10) + resp = requests.post(url, json=payload, timeout=10, + proxies={"http": None, "https": None}) if resp.status_code == 200: self._logger.debug(f"[心跳] {url} 上报成功") else: From e70596a23288c0987caf66e7aa43e401ec8c7d09 Mon Sep 17 00:00:00 2001 From: YangCheng <1633794139@qq.com> Date: Fri, 20 Mar 2026 22:28:32 +0800 Subject: [PATCH 22/28] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=EF=BC=9ACLOUD=5FAPI=5F?= =?UTF-8?q?URL=E6=94=B9=E4=B8=BAFastAPI=208000=E7=AB=AF=E5=8F=A3=EF=BC=8C?= =?UTF-8?q?=E5=A2=9E=E5=8A=A0WVP=5FAPI=5FURL=E7=94=A8=E4=BA=8E=E5=BF=83?= =?UTF-8?q?=E8=B7=B3=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.env b/.env index a4b9946..9dbdf89 100644 --- a/.env +++ b/.env @@ -1,8 +1,10 @@ # Local debug overrides DEBUG_SERVER_HOST=0.0.0.0 -# Alarm upload - cloud API URL (WVP backend) -CLOUD_API_URL=http://124.221.55.225:18080 +# Alarm upload - cloud API URL (FastAPI 告警后端) +CLOUD_API_URL=http://124.221.55.225:8000 +# WVP 平台地址(心跳同步用) +WVP_API_URL=http://124.221.55.225:18080 # ===== 边缘设备 ID(第二台 Edge 节点,必须与 WVP 数据库 ROI 表的 device_id 一致) ===== EDGE_DEVICE_ID=edge_002 From 06b397044e2d801a4e94a6b72733d2533448710b Mon Sep 17 00:00:00 2001 From: YangCheng <1633794139@qq.com> Date: Fri, 20 Mar 2026 22:39:00 +0800 Subject: [PATCH 23/28] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=EF=BC=9A=E4=BB=8E?= =?UTF-8?q?=E7=89=88=E6=9C=AC=E6=8E=A7=E5=88=B6=E7=A7=BB=E9=99=A4.env?= =?UTF-8?q?=E6=96=87=E4=BB=B6=EF=BC=88=E5=90=AB=E8=AE=BE=E5=A4=87=E4=B8=93?= =?UTF-8?q?=E5=B1=9E=E9=85=8D=E7=BD=AE=E5=92=8C=E5=AF=86=E9=92=A5=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env | 34 ---------------------------------- 1 file changed, 34 deletions(-) delete mode 100644 .env diff --git a/.env b/.env deleted file mode 100644 index 9dbdf89..0000000 --- a/.env +++ /dev/null @@ -1,34 +0,0 @@ -# Local debug overrides -DEBUG_SERVER_HOST=0.0.0.0 - -# Alarm upload - cloud API URL (FastAPI 告警后端) -CLOUD_API_URL=http://124.221.55.225:8000 -# WVP 平台地址(心跳同步用) -WVP_API_URL=http://124.221.55.225:18080 - -# ===== 边缘设备 ID(第二台 Edge 节点,必须与 WVP 数据库 ROI 表的 device_id 一致) ===== -EDGE_DEVICE_ID=edge_002 - -# ===== 配置同步模式(REDIS=监听云端Stream, LOCAL=仅本地SQLite) ===== -CONFIG_SYNC_MODE=REDIS - -# ===== 云端 Redis(截图处理器 + 配置同步,db=1) ===== -CLOUD_REDIS_HOST=sh-crs-6upea3zn.sql.tencentcdb.com -CLOUD_REDIS_PORT=24637 -CLOUD_REDIS_DB=1 -CLOUD_REDIS_PASSWORD=HkVZkVnn1 - -# ===== 腾讯云 COS(截图上传) ===== -COS_REGION=ap-shanghai -COS_BUCKET=xhwkzx-1-1389966313 -COS_SECRET_ID=AKIDVxPiqmVhYv7FCwVqytdAVddQ2TJySt9I -COS_SECRET_KEY=1rVyEI8mMVWs21xfBUjy4BE6DA4z7KWb - -# ===== 本地 Redis(告警队列、配置缓存) ===== -LOCAL_REDIS_HOST=localhost -LOCAL_REDIS_PORT=6379 -LOCAL_REDIS_DB=0 - -# ===== 推理配置(GTX 1050 Ti 2GB 显存优化) ===== -BATCH_SIZE=1 -VIDEO_DEFAULT_FPS=3 From ef4cb0536af061029939eb215d9e71601341dc1c Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Fri, 20 Mar 2026 22:48:09 +0800 Subject: [PATCH 24/28] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=EF=BC=9A=E5=90=AF?= =?UTF-8?q?=E5=8A=A8=E6=97=B6=E6=B8=85=E9=99=A4=E7=B3=BB=E7=BB=9F=E4=BB=A3?= =?UTF-8?q?=E7=90=86=E7=8E=AF=E5=A2=83=E5=8F=98=E9=87=8F=EF=BC=8C=E9=98=B2?= =?UTF-8?q?=E6=AD=A2=20Clash=20=E5=B9=B2=E6=89=B0=20Redis=20=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Clash 代理会拦截 Redis 的 TCP 长连接导致频繁断连。 在 main.py 和 config_sync.py 模块加载时清除所有代理环境变量。 --- core/config_sync.py | 4 ++++ main.py | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/core/config_sync.py b/core/config_sync.py index e20d02d..8db730e 100644 --- a/core/config_sync.py +++ b/core/config_sync.py @@ -16,6 +16,10 @@ import logging import os import platform import socket + +# 禁用系统代理(Clash 等代理工具会干扰 Redis TCP 长连接) +for key in ("http_proxy", "https_proxy", "HTTP_PROXY", "HTTPS_PROXY", "all_proxy", "ALL_PROXY"): + os.environ.pop(key, None) import threading import time from datetime import datetime diff --git a/main.py b/main.py index c4dd923..989b727 100644 --- a/main.py +++ b/main.py @@ -12,6 +12,10 @@ import time from datetime import datetime from typing import Dict, Any, Optional, List, Tuple +# 禁用系统代理(Clash 等代理工具会干扰 Redis TCP 长连接和 HTTP 请求) +for _key in ("http_proxy", "https_proxy", "HTTP_PROXY", "HTTPS_PROXY", "all_proxy", "ALL_PROXY"): + os.environ.pop(_key, None) + from config.settings import get_settings, Settings from core.config_sync import get_config_sync_manager, ConfigSyncManager from core.debug_http_server import start_debug_http_server From 3681f35b69aa5e5c996672a65f34f47accbca98d Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Tue, 24 Mar 2026 09:00:19 +0800 Subject: [PATCH 25/28] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=EF=BC=9A=E9=87=8D?= =?UTF-8?q?=E8=BF=9E=E4=B8=8A=E9=99=90=E5=90=8E=E5=81=9C=E6=AD=A2=E5=BE=AA?= =?UTF-8?q?=E7=8E=AF=EF=BC=8C=E4=B8=8D=E5=86=8D=E6=AF=8F=E7=A7=92=E5=88=B7?= =?UTF-8?q?=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 重连次数达到上限后 break 退出读帧循环, 而不是 sleep(1) + continue 无限刷错误日志。 --- core/video_stream.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/video_stream.py b/core/video_stream.py index a451598..67e8616 100644 --- a/core/video_stream.py +++ b/core/video_stream.py @@ -181,8 +181,7 @@ class RTSPStreamReader: while not self._stop_event.is_set(): if not self._connected: if not self._reconnect(): - time.sleep(1) - continue + break try: ret, frame = self._cap.read() From 749257cb8a6c579f8de63a386f3e8002b45c528c Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Wed, 25 Mar 2026 10:27:49 +0800 Subject: [PATCH 26/28] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=EF=BC=9A=E6=88=AA?= =?UTF-8?q?=E5=9B=BE=E5=A4=84=E7=90=86=E5=99=A8=20Redis=20=E6=96=AD?= =?UTF-8?q?=E7=BA=BF=E5=90=8E=E6=97=A0=E6=B3=95=E8=87=AA=E5=8A=A8=E9=87=8D?= =?UTF-8?q?=E8=BF=9E=E7=9A=84=E6=AD=BB=E5=BE=AA=E7=8E=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 重连失败时不再置 cloud_redis=None,避免后续循环走错分支 - except Exception 分支增加 cloud_redis is None 判断,主动触发重连 - 重连成功后重置退避计时器 --- core/screenshot_handler.py | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/core/screenshot_handler.py b/core/screenshot_handler.py index 8eb033a..4dc8010 100644 --- a/core/screenshot_handler.py +++ b/core/screenshot_handler.py @@ -127,12 +127,12 @@ class ScreenshotHandler: # ==================== 重连 ==================== - def _reconnect_cloud_redis(self): - """重建云端 Redis 连接""" + def _reconnect_cloud_redis(self) -> bool: + """重建云端 Redis 连接,返回是否成功""" try: from core.config_sync import _build_keepalive_options cfg = self._settings.cloud_redis - self._cloud_redis = redis.Redis( + new_client = redis.Redis( host=cfg.host, port=cfg.port, db=cfg.db, @@ -145,11 +145,14 @@ class ScreenshotHandler: socket_keepalive_options=_build_keepalive_options(), health_check_interval=15, ) - self._cloud_redis.ping() + new_client.ping() + self._cloud_redis = new_client logger.info("[截图] 云端 Redis 重连成功") + return True except Exception as e: logger.warning("[截图] 云端 Redis 重连失败: %s", e) - self._cloud_redis = None + # 不置 None,避免后续循环永远无法触发 ConnectionError 重连 + return False # ==================== 主循环 ==================== @@ -196,16 +199,25 @@ class ScreenshotHandler: if self._stop_event.is_set(): return logger.warning("[截图] 云端 Redis 连接断开: %s, %ds 后重连...", e, backoff) - self._reconnect_cloud_redis() self._stop_event.wait(backoff) + self._reconnect_cloud_redis() backoff = min(backoff * 2, max_backoff) except Exception as e: if self._stop_event.is_set(): return - logger.warning("[截图] 监听异常: %s, %ds 后重试", e, backoff) - self._stop_event.wait(backoff) - backoff = min(backoff * 2, max_backoff) + # cloud_redis 为 None 时主动重连,避免死循环 + if self._cloud_redis is None: + logger.warning("[截图] 云端 Redis 未连接, %ds 后重连...", backoff) + self._stop_event.wait(backoff) + if self._reconnect_cloud_redis(): + backoff = 5 # 重连成功,重置退避 + else: + backoff = min(backoff * 2, max_backoff) + else: + logger.warning("[截图] 监听异常: %s, %ds 后重试", e, backoff) + self._stop_event.wait(backoff) + backoff = min(backoff * 2, max_backoff) # ==================== 请求处理 ==================== From 42a8cf3ce38ab3dcb6489fc28c3a5d900819cffc Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Fri, 27 Mar 2026 15:27:26 +0800 Subject: [PATCH 27/28] =?UTF-8?q?=E5=8A=9F=E8=83=BD=EF=BC=9A=E8=BE=B9?= =?UTF-8?q?=E7=BC=98=E7=AB=AF=20ROI=20=E7=BA=A7=E5=91=8A=E8=AD=A6=E5=8E=BB?= =?UTF-8?q?=E9=87=8D=20=E2=80=94=20=E6=9C=AA=20resolve=20=E7=9A=84?= =?UTF-8?q?=E5=91=8A=E8=AD=A6=E4=B8=8D=E9=87=8D=E5=A4=8D=E5=8F=91=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增 _active_alarms 字典(roi_id+alert_type → alarm_id), 告警创建时记录,resolve 时清除。同 ROI 同类型有活跃告警时跳过新告警。 解决违停/拥堵等持续性告警在冷却期过后重复触发的问题。 --- main.py | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/main.py b/main.py index 989b727..75ac0f3 100644 --- a/main.py +++ b/main.py @@ -81,6 +81,10 @@ class EdgeInferenceService: self._camera_alert_cooldown: Dict[str, datetime] = {} self._camera_cooldown_seconds = 30 # 同摄像头同类型告警最小间隔(秒) + # ROI级别告警去重:同ROI+同类型未resolve的告警不重复发送 + # key: f"{roi_id}_{alert_type}", value: alarm_id + self._active_alarms: Dict[str, str] = {} + self._logger.info("Edge_Inference_Service 初始化开始") def _init_database(self): @@ -801,21 +805,39 @@ class EdgeInferenceService: # resolve 事件:更新已有告警,不创建新告警 if alert_type == "alarm_resolve": + resolve_alarm_id = alert.get("resolve_alarm_id") resolve_data = { - "alarm_id": alert.get("resolve_alarm_id"), + "alarm_id": resolve_alarm_id, "duration_ms": alert.get("duration_ms"), "last_frame_time": alert.get("last_frame_time"), "resolve_type": alert.get("resolve_type"), } if self._reporter: self._reporter.report_alarm_resolve(resolve_data) + + # 清除活跃告警记录,允许后续新告警 + for k, v in list(self._active_alarms.items()): + if v == resolve_alarm_id: + del self._active_alarms[k] + self._logger.debug(f"[去重] 活跃告警已清除: {k} -> {resolve_alarm_id}") + break + self._logger.info( - f"离岗告警结束: alarm_id={resolve_data['alarm_id']}, " + f"告警已结束: alarm_id={resolve_data['alarm_id']}, " f"duration_ms={resolve_data['duration_ms']}, " f"reason={resolve_data['resolve_type']}" ) continue + # ROI级别去重:同ROI+同类型有未resolve的告警时不重复发送 + active_key = f"{roi_id}_{alert_type}" + if active_key in self._active_alarms: + self._logger.debug( + f"[去重] 跳过告警: roi={roi_id}, type={alert_type}, " + f"存在未结束告警={self._active_alarms[active_key]}" + ) + continue + # 摄像头级别去重:同一摄像头+告警类型在冷却期内只上报一次 dedup_key = f"{camera_id}_{alert_type}" now = frame.timestamp @@ -868,6 +890,9 @@ class EdgeInferenceService: ) self._reporter.report_alarm(alarm_info, screenshot=frame.image) + # 记录活跃告警(用于 ROI 级去重) + self._active_alarms[active_key] = alarm_info.alarm_id + # 回填 alarm_id 到算法实例(用于后续 resolve 追踪,泛化支持所有算法类型) algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get(alert_type) if algo and hasattr(algo, 'set_last_alarm_id'): From f0d01d9a0dc79a6c73e2dc0183b8bd8991744d94 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Mon, 30 Mar 2026 11:07:44 +0800 Subject: [PATCH 28/28] =?UTF-8?q?=E5=8A=9F=E8=83=BD=EF=BC=9A=E6=94=AF?= =?UTF-8?q?=E6=8C=81=20fullscreen=20ROI=20=E7=B1=BB=E5=9E=8B=20=E2=80=94?= =?UTF-8?q?=20=E5=85=A8=E5=9B=BE=E4=B8=8D=E8=A3=81=E5=89=AA=E7=9B=B4?= =?UTF-8?q?=E6=8E=A5=E6=8E=A8=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/config_models.py | 1 + core/preprocessor.py | 9 +++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/config/config_models.py b/config/config_models.py index 90b7f3f..1579098 100644 --- a/config/config_models.py +++ b/config/config_models.py @@ -13,6 +13,7 @@ class ROIType(str, Enum): """ROI类型枚举""" POLYGON = "polygon" RECTANGLE = "rectangle" + FULLSCREEN = "fullscreen" class AlgorithmType(str, Enum): diff --git a/core/preprocessor.py b/core/preprocessor.py index 5571b70..6343aac 100644 --- a/core/preprocessor.py +++ b/core/preprocessor.py @@ -43,7 +43,9 @@ class ROICropper: 裁剪后的图像,失败返回None """ try: - if roi.roi_type == ROIType.RECTANGLE: + if roi.roi_type == ROIType.FULLSCREEN: + return image.copy() + elif roi.roi_type == ROIType.RECTANGLE: return self._crop_rectangle(image, roi.coordinates) elif roi.roi_type == ROIType.POLYGON: return self._crop_polygon(image, roi.coordinates) @@ -137,7 +139,10 @@ class ROICropper: height, width = image_shape mask = np.zeros((height, width), dtype=np.uint8) - if roi.roi_type == ROIType.RECTANGLE: + if roi.roi_type == ROIType.FULLSCREEN: + mask[:] = 255 + + elif roi.roi_type == ROIType.RECTANGLE: if len(roi.coordinates) >= 2: x1, y1 = int(roi.coordinates[0][0]), int(roi.coordinates[0][1]) x2, y2 = int(roi.coordinates[1][0]), int(roi.coordinates[1][1])