Compare commits

24 Commits

Author SHA1 Message Date
ef4cb0536a 修复:启动时清除系统代理环境变量,防止 Clash 干扰 Redis 连接
Clash 代理会拦截 Redis 的 TCP 长连接导致频繁断连。
在 main.py 和 config_sync.py 模块加载时清除所有代理环境变量。
2026-03-20 22:48:09 +08:00
06b397044e 修复:从版本控制移除.env文件(含设备专属配置和密钥) 2026-03-20 22:39:00 +08:00
e70596a232 修复:CLOUD_API_URL改为FastAPI 8000端口,增加WVP_API_URL用于心跳同步 2026-03-20 22:28:32 +08:00
d972f6e5fe 修复:所有HTTP请求禁用系统代理,解决本机Clash代理导致心跳和告警上报超时 2026-03-20 22:24:19 +08:00
5049f7b26b 功能:截图处理器按device_id过滤请求,consumer name加设备标识 2026-03-20 22:20:20 +08:00
56bf454135 修复:XREAD 空闲时主动 PING 保活,解决 Windows 下云端 Redis 频繁断连
Windows 不支持 TCP_KEEPIDLE 自定义参数,TCP keepalive 无法阻止
腾讯云 Redis 的空闲超时断连。改用应用层 PING 保活:
每次 XREAD 超时(5秒)返回空时发一次 PING,确保连接活跃。
2026-03-20 15:07:05 +08:00
b70f8cd680 优化:Redis 连接增强 — TCP keepalive 适配 + 截图处理器独立重连
- 新增 _build_keepalive_options() 适配 Linux/Windows TCP keepalive
- health_check_interval 30→15秒,更快发现断连
- 截图处理器新增 _reconnect_cloud_redis() 独立重连能力
- 截图监听捕获 ConnectionError 主动重连,不再退避到60秒
2026-03-20 11:19:31 +08:00
d5c443c7c6 修复:云端 Redis 连接加 keepalive + 健康检查,防止频繁断连
腾讯云 Redis 空闲连接超时会主动断开,加 socket_keepalive=True
和 health_check_interval=30 保活,减少重连频率。
2026-03-20 10:03:41 +08:00
369bb02391 优化:违停和拥堵 resolve 后清除冷却记录
车辆离开/拥堵消散确认后清冷却,新的违停/拥堵事件可正常触发。
15秒确认期+命中率阈值已能过滤路过车辆,不会误触发。
2026-03-19 15:47:01 +08:00
648606fd0d 优化:车辆违停和拥堵算法防频繁告警
违停:
- confirm_clear 30→120秒(持续2分钟无车才确认离开)
- cooldown 600→1800秒(30分钟冷却)
- ALARMED 清除阈值 ratio<0.3→ratio<0.15(更严格)

拥堵:
- confirm_clear 120→180秒
- cooldown 600→1800秒
- 消散阈值从 < threshold 改为 < threshold*0.5(降到一半才开始确认)
2026-03-19 15:19:27 +08:00
8da4ef9e93 修复:Redis 配置加载时传入 alarm_level 到算法实例
Redis 订阅路径初始化算法时漏传 alarm_level 参数,
导致前端配置的告警等级不生效,始终使用默认值。
2026-03-19 10:56:54 +08:00
13706bc55c 安全:移除 .env 跟踪,添加 .env.example 模板
.env 含真实密钥不应入库,已在 .gitignore 中忽略。
新增 .env.example 作为配置模板(占位符)。
2026-03-19 10:30:51 +08:00
2ea35ad5d3 功能:心跳同时发送到 vsp-service 和 WVP 平台
新增 WVP_API_URL 环境变量,心跳同时上报到两个地址,
解决前端从 WVP 读设备状态显示离线的问题。
2026-03-19 09:58:24 +08:00
9c39913a55 功能:告警级别支持前端配置下发 + 级别体系统一为 0-3
- 四种算法均支持通过 params.alarm_level 覆盖默认告警级别
- 级别体系统一:0紧急/1重要/2普通/3轻微
- 车辆拥堵默认阈值调整为 5 辆

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 16:39:58 +08:00
3d91aa1a67 功能:area_id 全链路支持 + 截图处理器独立 Redis 连接
- CameraInfo 模型添加 area_id 字段
- SQLite 表增加 area_id 列及迁移
- config_sync 同步 area_id 到本地
- 告警 ext_data 携带 area_id
- 截图处理器使用独立 Redis 连接,避免与配置同步阻塞冲突
- get_all_camera_configs 使用 cursor.description 动态获取列名

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 16:05:04 +08:00
0d88ed7fbb 功能:添加心跳守护线程,每30秒向 WVP 上报设备状态
- 新增 _start_heartbeat() 守护线程
- 每30秒 POST 到 WVP /api/ai/device/heartbeat
- 上报 uptime、帧数、告警数、活跃流数、配置版本
- 使用 stop_event.wait(30) 优雅退出

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-17 17:47:12 +08:00
ea992c6daa 功能:新增车辆违停和车辆拥堵检测算法
- IllegalParkingAlgorithm: 5状态机(IDLE→CONFIRMING_VEHICLE→PARKED_COUNTDOWN→ALARMED→CONFIRMING_CLEAR)
  禁停区域检测,15秒确认+5分钟倒计时,滑动窗口抗抖动,支持car/truck/bus/motorcycle
- VehicleCongestionAlgorithm: 4状态机(NORMAL→CONFIRMING_CONGESTION→CONGESTED→CONFIRMING_CLEAR)
  车辆计数≥阈值+持续60秒触发,滑动窗口平均值判断
- AlgorithmManager: 新增default_params、register_algorithm、get_status支持两种新算法
- main.py: 泛化alarm_id回填和first_frame_time提取,ext_data新增vehicle_count字段

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-16 16:54:47 +08:00
9a1ac16f19 修复:截图回调禁用系统代理,解决502回调失败
requests.post回调WVP时被本地代理(127.0.0.1:7897)拦截导致502,
添加proxies=None绕过系统代理直连WVP。

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-11 09:40:51 +08:00
d132a50ae0 fix: 告警上传COS路径日期改为北京时间
alarm_upload_worker.py 中 COS object key 的日期和死信时间戳
从 UTC 改为北京时间(UTC+8),与服务端和日志时间一致。

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 17:22:50 +08:00
82d17a5266 fix: alarm_id 时间戳改为北京时间
generate_alarm_id() 中 datetime.now(timezone.utc) 改为
datetime.now(timezone(timedelta(hours=8))),与服务端时间一致。

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 17:20:50 +08:00
1211fc7207 feat: 离岗算法滑动窗口抗抖动优化
- 上岗确认阈值从 0.7 降为 0.6,降低漏确认
- 离岗触发从 ratio==0 改为 ratio<0.2,允许 20% 抖动
- 离岗中断从单帧判断改为 ratio>=0.5,避免偶尔一帧误检打断确认
- 入侵算法不变,通过调高 conf 到 0.6 解决误报

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 23:20:31 +08:00
7a5ddef2f6 feat: 支持按算法独立配置置信度阈值
通过 ALGO_CONF_{ALGO_CODE} 环境变量为每个算法设置独立的 conf_threshold,
未配置的算法回退到全局 CONF_THRESHOLD。推理过程零改动,仅后处理过滤阶段
按 bind.algo_code 使用对应阈值。

当前配置:离岗=0.4(降低漏检),入侵=0.5(减少误报)。

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 17:31:12 +08:00
3d88dfc1c6 修复:删除算法绑定后边缘端未停止对应算法推理
根本原因:config_sync.py 的 _cleanup_stale_records() 方法接收了
incoming_bind_ids 参数但从未使用,导致当 ROI 仍存在但其中某个
算法绑定被删除时,孤立的绑定记录继续留在 SQLite 中,推理循环
仍然从 SQLite 读取到已删除的绑定并继续生成告警。

修复内容:
1. config/database.py: 添加 get_all_bind_ids() 方法
2. core/config_sync.py: 在 _cleanup_stale_records() 中补全
   使用 incoming_bind_ids 清理孤立绑定的逻辑
3. algorithms.py: 在 reload_all_algorithms() 中添加清理内存中
   孤立算法实例的逻辑,防止内存泄漏
2026-03-05 17:12:15 +08:00
1d4eaf0174 文档:创建 CLAUDE.md 开发指南
新增内容:
- 项目概述(边缘 AI 推理服务功能说明)
- 常用命令(本地开发、测试、工具脚本、Docker 部署)
- 架构概览(核心模块详细说明)
  - 配置同步、视频流、推理引擎、告警上报
  - 算法模块(离岗、入侵检测)
- 数据流说明(配置下发、视频推理、告警上报、截图请求)
- Redis Key 设计(云端 + 本地)
- 配置文件说明(.env 环境变量、YAML 配置)
- 告警上报数据格式(触发和结束)
- 开发工作流(添加算法、修改推理流程、优化性能)
- 常见问题排查(引擎加载、告警失败、配置更新、GPU 内存)
- Git 提交规范

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-03-05 16:31:35 +08:00
13 changed files with 1291 additions and 125 deletions

23
.env
View File

@@ -1,23 +0,0 @@
# Local debug overrides
DEBUG_SERVER_HOST=0.0.0.0
# Alarm upload - cloud API URL (WVP backend)
CLOUD_API_URL=http://124.221.55.225:18080
# ===== 边缘设备 ID必须与 WVP 数据库 ROI 表的 device_id 一致) =====
EDGE_DEVICE_ID=edge
# ===== 配置同步模式REDIS=监听云端Stream, LOCAL=仅本地SQLite =====
CONFIG_SYNC_MODE=REDIS
# ===== 云端 Redis截图处理器 + 配置同步db=1 =====
CLOUD_REDIS_HOST=sh-crs-6upea3zn.sql.tencentcdb.com
CLOUD_REDIS_PORT=24637
CLOUD_REDIS_DB=1
CLOUD_REDIS_PASSWORD=HkVZkVnn1
# ===== 腾讯云 COS截图上传 =====
COS_REGION=ap-shanghai
COS_BUCKET=xhwkzx-1-1389966313
COS_SECRET_ID=AKIDVxPiqmVhYv7FCwVqytdAVddQ2TJySt9I
COS_SECRET_KEY=1rVyEI8mMVWs21xfBUjy4BE6DA4z7KWb

25
.env.example Normal file
View File

@@ -0,0 +1,25 @@
# ===== 云端 API =====
CLOUD_API_URL=http://your-server:8000
WVP_API_URL=http://your-server:18080
# ===== 边缘设备 ID必须与 WVP 数据库 ROI 表的 device_id 一致) =====
EDGE_DEVICE_ID=edge
# ===== 配置同步模式REDIS=监听云端Stream, LOCAL=仅本地SQLite =====
CONFIG_SYNC_MODE=REDIS
# ===== 云端 Redis截图处理器 + 配置同步) =====
CLOUD_REDIS_HOST=your-redis-host
CLOUD_REDIS_PORT=6379
CLOUD_REDIS_DB=1
CLOUD_REDIS_PASSWORD=your-redis-password
# ===== 腾讯云 COS截图上传 =====
COS_REGION=ap-shanghai
COS_BUCKET=your-bucket-name
COS_SECRET_ID=your-cos-secret-id
COS_SECRET_KEY=your-cos-secret-key
# ===== 按算法独立置信度阈值 =====
ALGO_CONF_LEAVE_POST=0.4
ALGO_CONF_INTRUSION=0.6

366
CLAUDE.md Normal file
View File

@@ -0,0 +1,366 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## 项目概述
边缘 AI 推理服务,部署在客户现场边缘端。通过 TensorRT GPU 加速实时分析视频流,检测离岗、入侵等异常行为,上报告警到云端。支持配置热更新、截图上传、告警去重等功能。
**部署位置:** 边缘端(客户现场)
**运行方式:** 裸机部署或 Docker需要 GPU 支持)
**主要功能:**
- RTSP 视频流接入(从 WVP 平台拉流)
- TensorRT GPU 推理批量处理8 帧/批次)
- 多算法支持leave_post 离岗、intrusion 入侵)
- 告警去重ROI 级 + 摄像头级冷却)
- 配置热更新Redis Stream 订阅)
- 截图上传(腾讯云 COS
- 告警上报HTTP POST 到云端)
## 常用命令
### 本地开发
```bash
# 安装依赖Python 3.8+
pip install -r requirements.txt
# 配置环境
cp .env.example .env
# 编辑 .env
# DEVICE_ID=edge_device_001
# CLOUD_REDIS_HOST=腾讯云Redis地址
# CLOUD_REDIS_PORT=6379
# CLOUD_REDIS_PASSWORD=密码
# LOCAL_REDIS_HOST=localhost
# LOCAL_REDIS_PORT=6379
# COS_SECRET_ID=腾讯云COS密钥ID
# COS_SECRET_KEY=腾讯云COS密钥KEY
# COS_BUCKET=your-bucket
# COS_REGION=ap-beijing
# CLOUD_API_URL=http://云端IP:8000
# 运行推理服务
python main.py
```
### 测试
```bash
# 运行完整工作流测试
python test_leave_post_full_workflow.py
# 运行无持续时长测试
python test_leave_post_no_duration.py
# 运行单元测试
pytest tests/
pytest -v tests/test_config_sync.py
```
### 工具脚本
```bash
# 诊断缺失摄像头配置
python diagnose_missing_cameras.py
# 清理旧的 ROI 配置
python cleanup_old_rois.py
# 恢复摄像头配置
python restore_cameras.py
```
### Docker 部署(生产环境)
```bash
# 构建镜像(需要 CUDA 12.1 + TensorRT 8.6 基础镜像)
docker build -t edge-inference:latest .
# 运行容器(需要 GPU 支持)
docker run -d \
--name edge-inference \
--gpus all \
--restart=always \
-v /path/to/models:/app/models \
-v /path/to/.env:/app/.env \
-v /path/to/data:/app/data \
edge-inference:latest
# 查看日志
docker logs -f edge-inference
# 重启容器
docker restart edge-inference
# 进入容器调试
docker exec -it edge-inference /bin/bash
# 检查 GPU 使用情况
nvidia-smi
```
## 架构概览
### 核心模块core/
- **config_sync.py** — 配置同步管理器
- 订阅云端 Redis Stream: `device_config_stream`
- 拉取配置:`GET device:{device_id}:config`
- 版本控制、自动回滚、离线可用
- 热更新视频流(启停摄像头)
- **video_stream.py** — 多流管理器
- RTSP 拉流、解码、帧缓存
- 多路并发处理
- 流状态监控、自动重连
- **preprocessor.py** — 图像预处理
- Resize、Normalize、NCHW 转换
- 批量预处理8 帧/批次)
- **tensorrt_engine.py** — TensorRT 推理引擎
- Engine 缓存管理
- 批量推理(提升吞吐量)
- GPU 内存优化
- **postprocessor.py** — 后处理器
- NMS非极大值抑制
- ROI 区域过滤point-in-polygon
- 算法分发(根据 algorithm_code
- **result_reporter.py** — 结果上报器
- 生成 alarm_id`edge_{device_id}_{timestamp}_{uuid6}`
- LPUSH 到本地 Redis: `local:alarm:pending`
- 零阻塞(立即返回)
- **alarm_upload_worker.py** — 告警上传 Worker
- 独立线程BRPOP 消费队列
- 上传截图到腾讯云 COS
- HTTP POST 到云端:`/api/ai/alert/edge/report`
- 失败重试3 次)→ 死信队列
- **screenshot_handler.py** — 截图处理器
- XREADGROUP 订阅云端 Redis Stream: `edge_snap_request`
- 从视频流获取最新帧
- 上传 COSHTTP 回调 WVP
### 算法模块algorithms.py
**已实现算法:**
- **LeavePostAlgorithm** — 离岗检测
- 检测 ROI 内是否有人
- 持续无人触发告警
- 人员回归发送 resolve 通知
- **IntrusionAlgorithm** — 入侵检测
- 检测 ROI 内是否有人入侵
- 立即触发告警
**算法接口:**
```python
class AlgorithmBase:
def process(self, detections, roi_info, camera_id, bind_info):
"""
处理检测结果
Args:
detections: 检测框列表 [{class_id, confidence, bbox}]
roi_info: ROI 配置 {roi_id, polygon, ...}
camera_id: 摄像头 ID
bind_info: 算法绑定配置 {threshold, cooldown, ...}
Returns:
告警信息或 None
"""
pass
```
### 数据流
```
配置下发:
WVP → XADD device_config_stream → Edge XREADGROUP
→ 拉取 Redis config → 版本校验 → 热更新视频流
视频推理:
RTSP 拉流 → 解码 → 预处理 → TensorRT 推理
→ NMS → ROI 过滤 → 算法处理 → 告警去重
→ LPUSH local:alarm:pending
告警上报:
BRPOP 队列 → 上传 COS 截图 → HTTP POST 云端
→ 失败重试 → 死信队列
截图请求:
WVP → XADD edge_snap_request → Edge XREADGROUP
→ 获取帧 → 上传 COS → HTTP 回调 WVP
```
## Redis Key 设计
### 云端 Redis
- `device:{device_id}:config` — 设备最新配置 JSON
- `device:{device_id}:version` — 配置版本号
- `device_config_stream` — 配置变更 Stream
- `edge_snap_request` — 截图请求 Stream
### 本地 Redis
- `local:device:config:current` — 当前生效配置
- `local:device:config:backup` — 上次成功配置(回滚用)
- `local:device:config:version` — 当前版本号
- `local:alarm:pending` — 待上报告警队列
- `local:alarm:retry` — 重试队列
- `local:alarm:dead` — 死信队列
## 配置文件
### .env 环境变量(关键配置)
```bash
# 设备标识
DEVICE_ID=edge_device_001
# 云端 Redis配置同步
CLOUD_REDIS_HOST=腾讯云Redis地址
CLOUD_REDIS_PORT=6379
CLOUD_REDIS_PASSWORD=密码
# 本地 Redis告警队列、配置缓存
LOCAL_REDIS_HOST=localhost
LOCAL_REDIS_PORT=6379
# 腾讯云 COS截图上传
COS_SECRET_ID=your_secret_id
COS_SECRET_KEY=your_secret_key
COS_BUCKET=your-bucket-1234567890
COS_REGION=ap-beijing
# 云端 API告警上报
CLOUD_API_URL=http://云端IP:8000
```
### config/ 目录YAML 配置)
- `settings.py` — 配置加载器(读取 .env
- `database.py` — SQLite 管理器(本地配置持久化)
- `config_models.py` — 配置数据模型
## 告警上报数据格式
### 告警触发POST /api/ai/alert/edge/report
```json
{
"alarm_id": "edge_device001_20260305120000_a1b2c3",
"alarm_type": "leave_post",
"device_id": "camera_001",
"scene_id": "roi_001",
"event_time": "2026-03-05T12:00:00Z",
"alarm_level": 2,
"snapshot_url": "https://cos.ap-beijing.myqcloud.com/...",
"confidence_score": 0.92,
"algorithm_code": "YOLO",
"ext_data": {
"bind_id": "bind_123",
"bbox": [100, 100, 300, 400],
"first_frame_time": "2026-03-05T12:00:00Z"
}
}
```
### 告警结束POST /api/ai/alert/edge/resolve
```json
{
"alarm_id": "edge_device001_20260305120000_a1b2c3",
"duration_ms": 120000,
"last_frame_time": "2026-03-05T12:02:00Z",
"resolve_type": "PERSON_RETURN"
}
```
## 开发工作流
### 添加新算法
1.`algorithms.py` 创建新的算法类,继承 `AlgorithmBase`
2. 实现 `process()` 方法
3.`AlgorithmManager` 注册算法
4. 更新 WVP 后端的算法配置表
### 修改推理流程
1. 修改 `main.py` 中的 `EdgeInferenceService`
2. 调整批量大小:`self._max_batch_size`
3. 调整冷却时间:`self._camera_cooldown_seconds`
### 调整告警去重策略
- ROI 级冷却:每个 ROI 独立冷却
- 摄像头级冷却:同摄像头同类型告警冷却
- 修改:`main.py` 中的 `_camera_alert_cooldown` 逻辑
### 优化性能
1. **批量推理**:调整 `_max_batch_size`(默认 8
2. **GPU 内存**:减少视频流并发数
3. **告警队列**:监控 Redis 队列长度
4. **TensorRT 引擎**:确保引擎缓存命中
## 常见问题
### TensorRT 引擎加载慢
首次运行会构建引擎5-10 分钟),之后会缓存。
检查 `models/` 目录下是否有 `.engine` 文件。
### 告警上报失败
检查云端 API 是否可达:
```bash
curl http://云端IP:8000/health
```
检查 COS 配置:
```bash
# 查看 .env 中的 COS 配置
cat .env | grep COS
```
### 配置不更新
检查云端 Redis 连接:
```bash
redis-cli -h 云端Redis地址 -p 6379 -a 密码 ping
```
检查配置版本:
```bash
redis-cli GET device:edge_device_001:version
redis-cli GET local:device:config:version
```
### 视频流断开
检查 RTSP 地址是否可访问:
```bash
ffprobe rtsp://云端IP:10002/...
```
检查 WVP 流媒体服务:
```bash
docker logs vsp-zlmedia
```
### GPU 内存不足
降低批量大小或减少并发流数量。
检查 GPU 使用情况:
```bash
nvidia-smi
```
## Git 提交规范
在修改代码后,使用中文提交信息:
```bash
git add .
git commit -m "功能添加XXX功能
详细说明...
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>"
```
**不要立即 push**,等待用户指示再推送到远程。

View File

@@ -43,6 +43,9 @@ class LeavePostAlgorithm:
STATE_ALARMED = "ALARMED" # 已告警(等待回岗)
STATE_NON_WORK_TIME = "NON_WORK_TIME" # 非工作时间
# 告警级别常量(默认值,可通过 params 覆盖)
DEFAULT_ALARM_LEVEL = 2 # 普通
def __init__(
self,
confirm_on_duty_sec: int = 10, # 上岗确认窗口(持续检测到人的时长)
@@ -52,6 +55,7 @@ class LeavePostAlgorithm:
cooldown_sec: int = 600, # 告警冷却期(两次告警的最小间隔)
working_hours: Optional[List[Dict]] = None,
target_class: Optional[str] = "person",
alarm_level: Optional[int] = None,
# 兼容旧参数名(向后兼容)
confirm_leave_sec: Optional[int] = None,
):
@@ -65,6 +69,7 @@ class LeavePostAlgorithm:
# 工作时间和目标类别
self.working_hours = working_hours or []
self.target_class = target_class
self._alarm_level = alarm_level if alarm_level is not None else self.DEFAULT_ALARM_LEVEL
# 状态变量
self.state: str = self.STATE_INIT
@@ -241,7 +246,7 @@ class LeavePostAlgorithm:
self.state_start_time = current_time
self.detection_window.clear()
logger.debug(f"ROI {roi_id}: CONFIRMING_ON_DUTY → INIT (人消失)")
elif elapsed >= self.confirm_on_duty_sec and detection_ratio >= 0.7:
elif elapsed >= self.confirm_on_duty_sec and detection_ratio >= 0.6:
# 上岗确认成功(命中率>=70%
self.state = self.STATE_ON_DUTY
self.state_start_time = current_time
@@ -250,8 +255,8 @@ class LeavePostAlgorithm:
elif self.state == self.STATE_ON_DUTY:
# 在岗状态:监控是否离岗
if detection_ratio == 0:
# 滑动窗口内完全没有人,进入离岗确认
if detection_ratio < 0.2:
# 滑动窗口内 80% 以上帧无人,进入离岗确认
self.state = self.STATE_CONFIRMING_OFF_DUTY
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: ON_DUTY → CONFIRMING_OFF_DUTY")
@@ -260,12 +265,12 @@ class LeavePostAlgorithm:
# 离岗确认中:需要持续未检测到人
elapsed = (current_time - self.state_start_time).total_seconds()
if roi_has_person:
# 人回来了回到ON_DUTY
if detection_ratio >= 0.5:
# 窗口内检测率恢复到 50% 以上,人确实回来了
self.state = self.STATE_ON_DUTY
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: CONFIRMING_OFF_DUTY → ON_DUTY (人回来了)")
elif elapsed >= self.confirm_off_duty_sec and detection_ratio == 0:
logger.debug(f"ROI {roi_id}: CONFIRMING_OFF_DUTY → ON_DUTY (人回来了, ratio={detection_ratio:.2f})")
elif elapsed >= self.confirm_off_duty_sec and detection_ratio < 0.2:
# 离岗确认成功,进入倒计时
self.state = self.STATE_OFF_DUTY_COUNTDOWN
self.state_start_time = current_time
@@ -295,6 +300,7 @@ class LeavePostAlgorithm:
"camera_id": camera_id,
"bbox": bbox,
"alert_type": "leave_post",
"alarm_level": self._alarm_level,
"message": "人员离岗告警",
"first_frame_time": self._leave_start_time.strftime('%Y-%m-%d %H:%M:%S'),
})
@@ -392,8 +398,8 @@ class IntrusionAlgorithm:
STATE_ALARMED = "ALARMED" # 已告警(等待入侵消失)
STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR" # 入侵消失确认中
# 告警级别常量
ALARM_LEVEL_INTRUSION = 3
# 告警级别常量(默认值,可通过 params 覆盖)
DEFAULT_ALARM_LEVEL = 1 # 重要
def __init__(
self,
@@ -402,6 +408,7 @@ class IntrusionAlgorithm:
confirm_intrusion_seconds: Optional[int] = None, # 入侵确认时间默认5秒
confirm_clear_seconds: Optional[int] = None, # 消失确认时间默认180秒
target_class: Optional[str] = None,
alarm_level: Optional[int] = None,
):
self.cooldown_seconds = cooldown_seconds
@@ -413,6 +420,7 @@ class IntrusionAlgorithm:
self.confirm_seconds = confirm_seconds
self.target_class = target_class
self._alarm_level = alarm_level if alarm_level is not None else self.DEFAULT_ALARM_LEVEL
# 状态变量
self.state: str = self.STATE_IDLE
@@ -516,7 +524,7 @@ class IntrusionAlgorithm:
"camera_id": camera_id,
"bbox": bbox,
"alert_type": "intrusion",
"alarm_level": self.ALARM_LEVEL_INTRUSION,
"alarm_level": self._alarm_level,
"message": "检测到周界入侵",
"first_frame_time": self._intrusion_start_time.strftime('%Y-%m-%d %H:%M:%S'),
})
@@ -705,6 +713,501 @@ class IntrusionAlgorithm:
# self.alert_triggered.clear()
class IllegalParkingAlgorithm:
"""
车辆违停检测算法(状态机版本 v1.0
状态机:
IDLE → CONFIRMING_VEHICLE → PARKED_COUNTDOWN → ALARMED → CONFIRMING_CLEAR → IDLE
业务流程:
1. 检测到车辆进入禁停区 → 车辆确认期confirm_vehicle_sec默认15秒ratio≥0.6
2. 确认有车 → 违停倒计时parking_countdown_sec默认300秒/5分钟
3. 倒计时结束仍有车 → 触发告警ALARMED状态
4. 车辆离开 → 消失确认期confirm_clear_sec默认30秒ratio<0.2
5. 确认车辆离开 → 发送resolve事件 → 回到空闲状态
使用滑动窗口10秒抗抖动支持多类车辆检测。
"""
# 状态定义
STATE_IDLE = "IDLE"
STATE_CONFIRMING_VEHICLE = "CONFIRMING_VEHICLE"
STATE_PARKED_COUNTDOWN = "PARKED_COUNTDOWN"
STATE_ALARMED = "ALARMED"
STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR"
# 告警级别常量(默认值,可通过 params 覆盖)
DEFAULT_ALARM_LEVEL = 1 # 重要
# 滑动窗口参数
WINDOW_SIZE_SEC = 10
def __init__(
self,
confirm_vehicle_sec: int = 15,
parking_countdown_sec: int = 300,
confirm_clear_sec: int = 120,
cooldown_sec: int = 1800,
target_classes: Optional[List[str]] = None,
alarm_level: Optional[int] = None,
):
self.confirm_vehicle_sec = confirm_vehicle_sec
self.parking_countdown_sec = parking_countdown_sec
self.confirm_clear_sec = confirm_clear_sec
self.cooldown_sec = cooldown_sec
self.target_classes = target_classes or ["car", "truck", "bus", "motorcycle"]
self._alarm_level = alarm_level if alarm_level is not None else self.DEFAULT_ALARM_LEVEL
# 状态变量
self.state: str = self.STATE_IDLE
self.state_start_time: Optional[datetime] = None
# 滑动窗口:存储 (timestamp, has_vehicle: bool)
self._detection_window: deque = deque()
# 告警追踪
self._last_alarm_id: Optional[str] = None
self._parking_start_time: Optional[datetime] = None
# 冷却期管理
self.alert_cooldowns: Dict[str, datetime] = {}
def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool:
matched_rois = detection.get("matched_rois", [])
for roi in matched_rois:
if roi.get("roi_id") == roi_id:
return True
return False
def _check_target_classes(self, detection: Dict) -> bool:
"""检查检测目标是否属于车辆类别"""
det_class = detection.get("class", "")
return det_class in self.target_classes
def _update_window(self, current_time: datetime, has_vehicle: bool):
"""更新滑动窗口"""
self._detection_window.append((current_time, has_vehicle))
cutoff = current_time - timedelta(seconds=self.WINDOW_SIZE_SEC)
while self._detection_window and self._detection_window[0][0] < cutoff:
self._detection_window.popleft()
def _get_window_ratio(self) -> float:
"""获取滑动窗口内的检测命中率"""
if not self._detection_window:
return 0.0
hits = sum(1 for _, has in self._detection_window if has)
return hits / len(self._detection_window)
def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]:
for det in tracks:
if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det):
return det.get("bbox", [])
return []
def _get_max_confidence(self, tracks: List[Dict], roi_id: str) -> float:
"""获取ROI内车辆的最高置信度"""
max_conf = 0.0
for det in tracks:
if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det):
max_conf = max(max_conf, det.get("confidence", 0.0))
return max_conf
def process(
self,
roi_id: str,
camera_id: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
"""处理单帧检测结果"""
current_time = current_time or datetime.now()
alerts = []
# 检查ROI内是否有车辆
roi_has_vehicle = any(
self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det)
for det in tracks
)
# 更新滑动窗口
self._update_window(current_time, roi_has_vehicle)
ratio = self._get_window_ratio()
# === 状态机处理 ===
if self.state == self.STATE_IDLE:
if roi_has_vehicle:
self.state = self.STATE_CONFIRMING_VEHICLE
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: IDLE → CONFIRMING_VEHICLE")
elif self.state == self.STATE_CONFIRMING_VEHICLE:
if self.state_start_time is None:
self.state = self.STATE_IDLE
return alerts
elapsed = (current_time - self.state_start_time).total_seconds()
if ratio < 0.3:
# 命中率过低,车辆可能只是路过
self.state = self.STATE_IDLE
self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_VEHICLE → IDLE (ratio={ratio:.2f}<0.3)")
elif elapsed >= self.confirm_vehicle_sec and ratio >= 0.6:
# 确认有车辆停留,进入倒计时
self._parking_start_time = self.state_start_time
self.state = self.STATE_PARKED_COUNTDOWN
self.state_start_time = current_time
logger.info(f"ROI {roi_id}: CONFIRMING_VEHICLE → PARKED_COUNTDOWN (ratio={ratio:.2f})")
elif self.state == self.STATE_PARKED_COUNTDOWN:
if self.state_start_time is None:
self.state = self.STATE_IDLE
return alerts
elapsed = (current_time - self.state_start_time).total_seconds()
if ratio < 0.2:
# 车辆已离开
self.state = self.STATE_IDLE
self.state_start_time = None
self._parking_start_time = None
logger.debug(f"ROI {roi_id}: PARKED_COUNTDOWN → IDLE (车辆离开, ratio={ratio:.2f})")
elif elapsed >= self.parking_countdown_sec:
# 倒计时结束,检查冷却期
cooldown_key = f"{camera_id}_{roi_id}"
if cooldown_key not in self.alert_cooldowns or \
(current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec:
bbox = self._get_latest_bbox(tracks, roi_id)
confidence = self._get_max_confidence(tracks, roi_id)
alerts.append({
"roi_id": roi_id,
"camera_id": camera_id,
"bbox": bbox,
"alert_type": "illegal_parking",
"alarm_level": self._alarm_level,
"confidence": confidence,
"message": f"检测到车辆违停(已停留{int(elapsed / 60)}分钟)",
"first_frame_time": self._parking_start_time.strftime('%Y-%m-%d %H:%M:%S') if self._parking_start_time else None,
"duration_minutes": elapsed / 60,
})
self.alert_cooldowns[cooldown_key] = current_time
self.state = self.STATE_ALARMED
logger.warning(f"ROI {roi_id}: PARKED_COUNTDOWN → ALARMED (违停告警触发)")
else:
self.state = self.STATE_IDLE
self.state_start_time = None
self._parking_start_time = None
logger.debug(f"ROI {roi_id}: PARKED_COUNTDOWN → IDLE (冷却期内)")
elif self.state == self.STATE_ALARMED:
if ratio < 0.15:
self.state = self.STATE_CONFIRMING_CLEAR
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: ALARMED → CONFIRMING_CLEAR (ratio={ratio:.2f}<0.15)")
elif self.state == self.STATE_CONFIRMING_CLEAR:
if self.state_start_time is None:
self.state = self.STATE_IDLE
return alerts
elapsed = (current_time - self.state_start_time).total_seconds()
if ratio >= 0.5:
# 车辆又出现回到ALARMED
self.state = self.STATE_ALARMED
self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → ALARMED (车辆仍在)")
elif elapsed >= self.confirm_clear_sec and ratio < 0.2:
# 确认车辆已离开
if self._last_alarm_id and self._parking_start_time:
duration_ms = int((current_time - self._parking_start_time).total_seconds() * 1000)
alerts.append({
"alert_type": "alarm_resolve",
"resolve_alarm_id": self._last_alarm_id,
"duration_ms": duration_ms,
"last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'),
"resolve_type": "vehicle_left",
})
logger.info(f"ROI {roi_id}: 违停告警已解决(车辆离开)")
self.state = self.STATE_IDLE
self.state_start_time = None
self._last_alarm_id = None
self._parking_start_time = None
self.alert_cooldowns.clear() # 车辆离开后清冷却,新车违停可正常告警
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → IDLE")
return alerts
def set_last_alarm_id(self, alarm_id: str):
"""由 main.py 在告警生成后回填 alarm_id"""
self._last_alarm_id = alarm_id
def reset(self):
"""重置算法状态"""
self.state = self.STATE_IDLE
self.state_start_time = None
self._last_alarm_id = None
self._parking_start_time = None
self._detection_window.clear()
self.alert_cooldowns.clear()
def get_state(self, current_time: Optional[datetime] = None) -> Dict[str, Any]:
"""获取当前状态"""
current_time = current_time or datetime.now()
state_info = {
"state": self.state,
"state_start_time": self.state_start_time.isoformat() if self.state_start_time else None,
"window_ratio": self._get_window_ratio(),
}
if self.state in (self.STATE_ALARMED, self.STATE_PARKED_COUNTDOWN) and self._parking_start_time:
state_info["parking_duration_sec"] = (current_time - self._parking_start_time).total_seconds()
state_info["alarm_id"] = self._last_alarm_id
return state_info
class VehicleCongestionAlgorithm:
"""
车辆拥堵检测算法(状态机版本 v1.0
状态机:
NORMAL → CONFIRMING_CONGESTION → CONGESTED → CONFIRMING_CLEAR → NORMAL
业务流程:
1. 检测到车辆数量 ≥ count_threshold → 拥堵确认期confirm_congestion_sec默认60秒
2. 确认拥堵(窗口内平均车辆数 ≥ threshold→ 触发告警
3. 车辆减少 → 消散确认期confirm_clear_sec默认120秒
4. 确认消散(平均数 < threshold→ 发送resolve事件 → 回到正常
使用滑动窗口10秒存储车辆计数取平均值判断。
"""
# 状态定义
STATE_NORMAL = "NORMAL"
STATE_CONFIRMING_CONGESTION = "CONFIRMING_CONGESTION"
STATE_CONGESTED = "CONGESTED"
STATE_CONFIRMING_CLEAR = "CONFIRMING_CLEAR"
# 告警级别(默认值,可通过 params 覆盖)
DEFAULT_ALARM_LEVEL = 2 # 普通
# 滑动窗口参数
WINDOW_SIZE_SEC = 10
def __init__(
self,
count_threshold: int = 5,
confirm_congestion_sec: int = 60,
confirm_clear_sec: int = 180,
cooldown_sec: int = 1800,
target_classes: Optional[List[str]] = None,
alarm_level: Optional[int] = None,
):
self.count_threshold = count_threshold
self.confirm_congestion_sec = confirm_congestion_sec
self.confirm_clear_sec = confirm_clear_sec
self.cooldown_sec = cooldown_sec
self.target_classes = target_classes or ["car", "truck", "bus", "motorcycle"]
self._alarm_level = alarm_level if alarm_level is not None else self.DEFAULT_ALARM_LEVEL
# 状态变量
self.state: str = self.STATE_NORMAL
self.state_start_time: Optional[datetime] = None
# 滑动窗口:存储 (timestamp, vehicle_count: int)
self._count_window: deque = deque()
# 告警追踪
self._last_alarm_id: Optional[str] = None
self._congestion_start_time: Optional[datetime] = None
# 冷却期管理
self.alert_cooldowns: Dict[str, datetime] = {}
def _check_detection_in_roi(self, detection: Dict, roi_id: str) -> bool:
matched_rois = detection.get("matched_rois", [])
for roi in matched_rois:
if roi.get("roi_id") == roi_id:
return True
return False
def _check_target_classes(self, detection: Dict) -> bool:
det_class = detection.get("class", "")
return det_class in self.target_classes
def _count_vehicles_in_roi(self, tracks: List[Dict], roi_id: str) -> int:
"""统计ROI内的车辆数量"""
return sum(
1 for det in tracks
if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det)
)
def _update_count_window(self, current_time: datetime, count: int):
"""更新车辆计数滑动窗口"""
self._count_window.append((current_time, count))
cutoff = current_time - timedelta(seconds=self.WINDOW_SIZE_SEC)
while self._count_window and self._count_window[0][0] < cutoff:
self._count_window.popleft()
def _get_avg_count(self) -> float:
"""获取滑动窗口内的平均车辆数"""
if not self._count_window:
return 0.0
total = sum(c for _, c in self._count_window)
return total / len(self._count_window)
def _get_max_confidence(self, tracks: List[Dict], roi_id: str) -> float:
max_conf = 0.0
for det in tracks:
if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det):
max_conf = max(max_conf, det.get("confidence", 0.0))
return max_conf
def _get_latest_bbox(self, tracks: List[Dict], roi_id: str) -> List[float]:
for det in tracks:
if self._check_detection_in_roi(det, roi_id) and self._check_target_classes(det):
return det.get("bbox", [])
return []
def process(
self,
roi_id: str,
camera_id: str,
tracks: List[Dict],
current_time: Optional[datetime] = None,
) -> List[Dict]:
"""处理单帧检测结果"""
current_time = current_time or datetime.now()
alerts = []
# 统计ROI内车辆数
vehicle_count = self._count_vehicles_in_roi(tracks, roi_id)
self._update_count_window(current_time, vehicle_count)
avg_count = self._get_avg_count()
# === 状态机处理 ===
if self.state == self.STATE_NORMAL:
if avg_count >= self.count_threshold:
self.state = self.STATE_CONFIRMING_CONGESTION
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: NORMAL → CONFIRMING_CONGESTION (avg={avg_count:.1f}{self.count_threshold})")
elif self.state == self.STATE_CONFIRMING_CONGESTION:
if self.state_start_time is None:
self.state = self.STATE_NORMAL
return alerts
elapsed = (current_time - self.state_start_time).total_seconds()
if avg_count < self.count_threshold:
# 车辆减少,回到正常
self.state = self.STATE_NORMAL
self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_CONGESTION → NORMAL (avg={avg_count:.1f}<{self.count_threshold})")
elif elapsed >= self.confirm_congestion_sec:
# 确认拥堵,检查冷却期
cooldown_key = f"{camera_id}_{roi_id}"
if cooldown_key not in self.alert_cooldowns or \
(current_time - self.alert_cooldowns[cooldown_key]).total_seconds() > self.cooldown_sec:
self._congestion_start_time = self.state_start_time
bbox = self._get_latest_bbox(tracks, roi_id)
confidence = self._get_max_confidence(tracks, roi_id)
alerts.append({
"roi_id": roi_id,
"camera_id": camera_id,
"bbox": bbox,
"alert_type": "vehicle_congestion",
"alarm_level": self._alarm_level,
"confidence": confidence,
"message": f"检测到车辆拥堵(平均{avg_count:.0f}辆,持续{int(elapsed)}秒)",
"first_frame_time": self._congestion_start_time.strftime('%Y-%m-%d %H:%M:%S') if self._congestion_start_time else None,
"vehicle_count": int(avg_count),
})
self.alert_cooldowns[cooldown_key] = current_time
self.state = self.STATE_CONGESTED
logger.warning(f"ROI {roi_id}: CONFIRMING_CONGESTION → CONGESTED (拥堵告警触发, avg={avg_count:.1f})")
else:
self.state = self.STATE_NORMAL
self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_CONGESTION → NORMAL (冷却期内)")
elif self.state == self.STATE_CONGESTED:
# 车辆数降到阈值的一半以下才开始确认消散(避免抖动)
if avg_count < self.count_threshold * 0.5:
self.state = self.STATE_CONFIRMING_CLEAR
self.state_start_time = current_time
logger.debug(f"ROI {roi_id}: CONGESTED → CONFIRMING_CLEAR (avg={avg_count:.1f}<{self.count_threshold * 0.5:.1f})")
elif self.state == self.STATE_CONFIRMING_CLEAR:
if self.state_start_time is None:
self.state = self.STATE_NORMAL
return alerts
elapsed = (current_time - self.state_start_time).total_seconds()
if avg_count >= self.count_threshold:
# 又拥堵了回到CONGESTED
self.state = self.STATE_CONGESTED
self.state_start_time = None
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → CONGESTED (avg={avg_count:.1f}{self.count_threshold})")
elif elapsed >= self.confirm_clear_sec:
# 确认消散
if self._last_alarm_id and self._congestion_start_time:
duration_ms = int((current_time - self._congestion_start_time).total_seconds() * 1000)
alerts.append({
"alert_type": "alarm_resolve",
"resolve_alarm_id": self._last_alarm_id,
"duration_ms": duration_ms,
"last_frame_time": current_time.strftime('%Y-%m-%d %H:%M:%S'),
"resolve_type": "congestion_cleared",
})
logger.info(f"ROI {roi_id}: 拥堵告警已解决(车辆减少)")
self.state = self.STATE_NORMAL
self.state_start_time = None
self._last_alarm_id = None
self._congestion_start_time = None
self.alert_cooldowns.clear() # 拥堵消散后清冷却,再次拥堵可正常告警
logger.debug(f"ROI {roi_id}: CONFIRMING_CLEAR → NORMAL")
return alerts
def set_last_alarm_id(self, alarm_id: str):
"""由 main.py 在告警生成后回填 alarm_id"""
self._last_alarm_id = alarm_id
def reset(self):
"""重置算法状态"""
self.state = self.STATE_NORMAL
self.state_start_time = None
self._last_alarm_id = None
self._congestion_start_time = None
self._count_window.clear()
self.alert_cooldowns.clear()
def get_state(self, current_time: Optional[datetime] = None) -> Dict[str, Any]:
"""获取当前状态"""
current_time = current_time or datetime.now()
state_info = {
"state": self.state,
"state_start_time": self.state_start_time.isoformat() if self.state_start_time else None,
"avg_vehicle_count": self._get_avg_count(),
}
if self.state in (self.STATE_CONGESTED, self.STATE_CONFIRMING_CLEAR) and self._congestion_start_time:
state_info["congestion_duration_sec"] = (current_time - self._congestion_start_time).total_seconds()
state_info["alarm_id"] = self._last_alarm_id
return state_info
class AlgorithmManager:
def __init__(self, working_hours: Optional[List[Dict]] = None):
self.algorithms: Dict[str, Dict[str, Any]] = {}
@@ -724,11 +1227,20 @@ class AlgorithmManager:
"confirm_seconds": 5,
"target_class": None,
},
# "crowd_detection": {
# "max_count": 10,
# "cooldown_seconds": 300,
# "target_class": "person",
# },
"illegal_parking": {
"confirm_vehicle_sec": 15,
"parking_countdown_sec": 300,
"confirm_clear_sec": 30,
"cooldown_sec": 600,
"target_classes": ["car", "truck", "bus", "motorcycle"],
},
"vehicle_congestion": {
"count_threshold": 3,
"confirm_congestion_sec": 60,
"confirm_clear_sec": 120,
"cooldown_sec": 600,
"target_classes": ["car", "truck", "bus", "motorcycle"],
},
}
self._pubsub = None
@@ -838,6 +1350,7 @@ class AlgorithmManager:
key = f"{roi_id}_{bind_id}"
if algo_code == "leave_post":
configured_alarm_level = params.get("alarm_level")
algo_params = {
"confirm_on_duty_sec": params.get("confirm_on_duty_sec", 10),
"confirm_leave_sec": params.get("confirm_leave_sec", 30),
@@ -851,6 +1364,8 @@ class AlgorithmManager:
algo.confirm_leave_sec = algo_params["confirm_leave_sec"]
algo.cooldown_sec = algo_params["cooldown_sec"]
algo.target_class = algo_params["target_class"]
if configured_alarm_level is not None:
algo._alarm_level = configured_alarm_level
if algo_params["working_hours"]:
algo.working_hours = algo_params["working_hours"]
logger.info(f"已热更新算法参数: {key}")
@@ -859,13 +1374,15 @@ class AlgorithmManager:
self.algorithms[roi_id][key]["leave_post"] = LeavePostAlgorithm(
confirm_on_duty_sec=algo_params["confirm_on_duty_sec"],
confirm_leave_sec=algo_params["confirm_leave_sec"],
leave_countdown_sec=algo_params.get("leave_countdown_sec", 300), # 离岗倒计时默认5分钟
leave_countdown_sec=algo_params.get("leave_countdown_sec", 300),
cooldown_sec=algo_params["cooldown_sec"],
working_hours=algo_params["working_hours"],
target_class=algo_params["target_class"],
alarm_level=configured_alarm_level,
)
logger.info(f"已从Redis加载算法: {key}")
elif algo_code == "intrusion":
configured_alarm_level = params.get("alarm_level")
algo_params = {
"cooldown_seconds": params.get("cooldown_seconds", 300),
"confirm_seconds": params.get("confirm_seconds", 5),
@@ -876,6 +1393,7 @@ class AlgorithmManager:
cooldown_seconds=algo_params["cooldown_seconds"],
confirm_seconds=algo_params["confirm_seconds"],
target_class=algo_params["target_class"],
alarm_level=configured_alarm_level,
)
logger.info(f"已从Redis加载算法: {key}")
@@ -998,9 +1516,13 @@ class AlgorithmManager:
config_manager = get_config_sync_manager()
bindings = config_manager.get_bindings_from_redis("")
# 收集当前配置中有效的 (roi_id, bind_id) 组合
valid_keys = set()
for bind in bindings:
bind_id = bind.get("bind_id")
roi_id = bind.get("roi_id")
valid_keys.add((roi_id, bind_id))
if preserve_state:
# 仅更新参数,不重置状态
@@ -1012,6 +1534,32 @@ class AlgorithmManager:
if self.load_bind_from_redis(bind_id):
count += 1
# 清理内存中已被删除的算法实例
removed_count = 0
with self._update_lock:
for roi_id in list(self.algorithms.keys()):
for key in list(self.algorithms[roi_id].keys()):
# key 格式: "roi_id_bind_id"
if key.startswith(roi_id + "_"):
bind_id = key[len(roi_id) + 1:]
if (roi_id, bind_id) not in valid_keys:
for algo in self.algorithms[roi_id][key].values():
algo.reset()
del self.algorithms[roi_id][key]
# 清除注册缓存
self._registered_keys = {
k for k in self._registered_keys
if not (k[0] == roi_id and k[1] == bind_id)
}
removed_count += 1
logger.info(f"清理已删除的算法实例: {key}")
# 如果 roi 下已无算法实例,清理空字典
if not self.algorithms[roi_id]:
del self.algorithms[roi_id]
if removed_count > 0:
logger.info(f"已清理 {removed_count} 个孤立算法实例")
logger.info(f"已重新加载 {count} 个算法配置 (preserve_state={preserve_state})")
return count
except Exception as e:
@@ -1044,6 +1592,9 @@ class AlgorithmManager:
if params:
algo_params.update(params)
# 从 params 中提取告警等级(前端配置下发)
configured_alarm_level = algo_params.get("alarm_level")
if algorithm_type == "leave_post":
roi_working_hours = algo_params.get("working_hours") or self.working_hours
self.algorithms[roi_id][key]["leave_post"] = LeavePostAlgorithm(
@@ -1053,20 +1604,33 @@ class AlgorithmManager:
cooldown_sec=algo_params.get("cooldown_sec", 600),
working_hours=roi_working_hours,
target_class=algo_params.get("target_class", "person"),
alarm_level=configured_alarm_level,
)
elif algorithm_type == "intrusion":
self.algorithms[roi_id][key]["intrusion"] = IntrusionAlgorithm(
cooldown_seconds=algo_params.get("cooldown_seconds", 300),
confirm_seconds=algo_params.get("confirm_seconds", 5),
target_class=algo_params.get("target_class"),
alarm_level=configured_alarm_level,
)
elif algorithm_type == "illegal_parking":
self.algorithms[roi_id][key]["illegal_parking"] = IllegalParkingAlgorithm(
confirm_vehicle_sec=algo_params.get("confirm_vehicle_sec", 15),
parking_countdown_sec=algo_params.get("parking_countdown_sec", 300),
confirm_clear_sec=algo_params.get("confirm_clear_sec", 30),
cooldown_sec=algo_params.get("cooldown_sec", 600),
target_classes=algo_params.get("target_classes", ["car", "truck", "bus", "motorcycle"]),
alarm_level=configured_alarm_level,
)
elif algorithm_type == "vehicle_congestion":
self.algorithms[roi_id][key]["vehicle_congestion"] = VehicleCongestionAlgorithm(
count_threshold=algo_params.get("count_threshold", 3),
confirm_congestion_sec=algo_params.get("confirm_congestion_sec", 60),
confirm_clear_sec=algo_params.get("confirm_clear_sec", 120),
cooldown_sec=algo_params.get("cooldown_sec", 600),
target_classes=algo_params.get("target_classes", ["car", "truck", "bus", "motorcycle"]),
alarm_level=configured_alarm_level,
)
# elif algorithm_type == "crowd_detection":
# from algorithms import CrowdDetectionAlgorithm
# self.algorithms[roi_id][key]["crowd_detection"] = CrowdDetectionAlgorithm(
# max_count=algo_params.get("max_count", 10),
# cooldown_seconds=algo_params.get("cooldown_seconds", 300),
# target_class=algo_params.get("target_class", "person"),
# )
self._registered_keys.add(cache_key)
@@ -1159,6 +1723,8 @@ class AlgorithmManager:
"state": getattr(algo, "state", "WAITING"),
"alarm_sent": getattr(algo, "alarm_sent", False),
}
elif algo_type in ("illegal_parking", "vehicle_congestion"):
status[f"{algo_type}_{bind_id}"] = algo.get_state()
else:
status[f"{algo_type}_{bind_id}"] = {
"detection_count": len(getattr(algo, "detection_start", {})),

View File

@@ -49,7 +49,8 @@ class CameraInfo:
enabled: bool = True
location: Optional[str] = None
extra_params: Optional[Dict[str, Any]] = None
area_id: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
@@ -60,8 +61,9 @@ class CameraInfo:
"enabled": self.enabled,
"location": self.location,
"extra_params": self.extra_params,
"area_id": self.area_id,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'CameraInfo':
"""从字典创建实例"""
@@ -73,6 +75,7 @@ class CameraInfo:
enabled=data.get("enabled", True),
location=data.get("location"),
extra_params=data.get("extra_params"),
area_id=data.get("area_id"),
)

View File

@@ -233,6 +233,7 @@ class SQLiteManager:
location TEXT,
roi_group_id TEXT,
extra_params TEXT,
area_id INTEGER,
updated_at TEXT
)
""")
@@ -250,7 +251,14 @@ class SQLiteManager:
""")
self._conn.commit()
# 迁移:为已有数据库添加 area_id 列
try:
cursor.execute("ALTER TABLE camera_configs ADD COLUMN area_id INTEGER")
self._conn.commit()
except Exception:
pass # 列已存在,忽略
self._init_default_algorithms()
def _init_default_algorithms(self):
@@ -555,8 +563,8 @@ class SQLiteManager:
cursor.execute("""
INSERT OR REPLACE INTO camera_configs (
camera_id, rtsp_url, camera_name, status, enabled,
location, roi_group_id, extra_params, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
location, roi_group_id, extra_params, area_id, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
camera_id, rtsp_url,
kwargs.get('camera_name'),
@@ -565,6 +573,7 @@ class SQLiteManager:
kwargs.get('location'),
kwargs.get('roi_group_id'),
str(kwargs.get('extra_params')) if kwargs.get('extra_params') else None,
kwargs.get('area_id'),
now
))
self._conn.commit()
@@ -580,8 +589,7 @@ class SQLiteManager:
cursor.execute("SELECT * FROM camera_configs WHERE camera_id = ?", (camera_id,))
row = cursor.fetchone()
if row:
columns = ['camera_id', 'rtsp_url', 'camera_name', 'status',
'enabled', 'location', 'roi_group_id', 'extra_params', 'updated_at']
columns = [desc[0] for desc in cursor.description]
return dict(zip(columns, row))
return None
except Exception as e:
@@ -593,8 +601,7 @@ class SQLiteManager:
try:
cursor = self._conn.cursor()
cursor.execute("SELECT * FROM camera_configs ORDER BY camera_id")
columns = ['camera_id', 'rtsp_url', 'camera_name', 'status',
'enabled', 'location', 'roi_group_id', 'extra_params', 'updated_at']
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"获取所有摄像头配置失败: {e}")
@@ -899,7 +906,17 @@ class SQLiteManager:
except Exception as e:
logger.error(f"删除ROI算法绑定失败: {e}")
return 0
def get_all_bind_ids(self) -> List[str]:
"""获取所有算法绑定的 bind_id 列表(用于清理孤立绑定)"""
try:
cursor = self._conn.cursor()
cursor.execute("SELECT bind_id FROM roi_algo_bind")
return [row[0] for row in cursor.fetchall()]
except Exception as e:
logger.error(f"获取所有绑定ID失败: {e}")
return []
def log_config_update(
self,
config_type: str,

View File

@@ -5,7 +5,7 @@
import os
from dataclasses import dataclass, field
from typing import List, Optional
from typing import Dict, List, Optional
@dataclass
@@ -93,6 +93,7 @@ class COSConfig:
class AlarmUploadConfig:
"""告警上报配置"""
cloud_api_url: str = "http://localhost:8000"
wvp_api_url: str = "" # WVP 平台地址(心跳同步用)
edge_token: str = ""
retry_max: int = 3
retry_interval: int = 5
@@ -117,10 +118,17 @@ class InferenceConfig:
input_width: int = 480
input_height: int = 480
batch_size: int = 1
conf_threshold: float = 0.4
nms_threshold: float = 0.45
conf_threshold: float = 0.45
nms_threshold: float = 0.5
device_id: int = 0
fp16_mode: bool = True
# 按算法类型覆盖置信度阈值key=algo_code, value=threshold
# 未命中时回退到 conf_threshold
algo_conf_thresholds: Dict[str, float] = field(default_factory=dict)
def get_conf_threshold(self, algo_code: str) -> float:
"""获取指定算法的置信度阈值,未配置则回退全局值"""
return self.algo_conf_thresholds.get(algo_code, self.conf_threshold)
# ===================== Debug / Local Sync =====================
@@ -256,6 +264,7 @@ class Settings:
self.alarm_upload = AlarmUploadConfig(
cloud_api_url=os.getenv("CLOUD_API_URL", "http://localhost:8000"),
wvp_api_url=os.getenv("WVP_API_URL", ""),
edge_token=os.getenv("EDGE_TOKEN", ""),
retry_max=int(os.getenv("ALARM_RETRY_MAX", "3")),
retry_interval=int(os.getenv("ALARM_RETRY_INTERVAL", "5")),
@@ -273,8 +282,9 @@ class Settings:
input_width=int(os.getenv("INPUT_WIDTH", "480")),
input_height=int(os.getenv("INPUT_HEIGHT", "480")),
batch_size=int(os.getenv("BATCH_SIZE", "4")),
conf_threshold=float(os.getenv("CONF_THRESHOLD", "0.4")),
nms_threshold=float(os.getenv("NMS_THRESHOLD", "0.45")),
conf_threshold=float(os.getenv("CONF_THRESHOLD", "0.45")),
nms_threshold=float(os.getenv("NMS_THRESHOLD", "0.5")),
algo_conf_thresholds=self._parse_algo_conf_thresholds(),
)
self.config_sync_mode = os.getenv("CONFIG_SYNC_MODE", "LOCAL").upper()
@@ -297,6 +307,25 @@ class Settings:
# 使用 COCO 类别名称
self.class_names = COCO_CLASS_NAMES
@staticmethod
def _parse_algo_conf_thresholds() -> Dict[str, float]:
"""解析 ALGO_CONF_* 环境变量,返回 {algo_code: threshold} 字典
环境变量命名规则: ALGO_CONF_{ALGO_CODE},如:
ALGO_CONF_LEAVE_POST=0.35
ALGO_CONF_INTRUSION=0.55
"""
prefix = "ALGO_CONF_"
result = {}
for key, value in os.environ.items():
if key.startswith(prefix) and value:
algo_code = key[len(prefix):].lower()
try:
result[algo_code] = float(value)
except ValueError:
pass
return result
def _parse_working_hours(self) -> List[dict]:
"""解析工作时间配置"""
working_hours_str = os.getenv("WORKING_HOURS", "")

View File

@@ -17,7 +17,9 @@ import json
import logging
import threading
import time
from datetime import datetime, timezone
from datetime import datetime, timezone, timedelta
_BEIJING_TZ = timezone(timedelta(hours=8))
from typing import Any, Dict, Optional
import redis
@@ -105,7 +107,7 @@ class AlarmUploadWorker:
self._logger.info(f"告警上报端点: {report_url}")
try:
resp = requests.get(health_url, timeout=5)
resp = requests.get(health_url, timeout=5, proxies={"http": None, "https": None})
if resp.status_code == 200:
self._logger.info(f"云端健康检查通过: {health_url}")
else:
@@ -245,7 +247,8 @@ class AlarmUploadWorker:
}
try:
response = requests.post(url, json=payload, headers=headers, timeout=10)
response = requests.post(url, json=payload, headers=headers, timeout=10,
proxies={"http": None, "https": None})
if response.status_code == 200:
body = response.json()
if body.get("code") == 0:
@@ -294,7 +297,7 @@ class AlarmUploadWorker:
return None
# 生成 Object Key: alarms/{device_id}/{yyyy-MM-dd}/{alarm_id}.jpg
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
date_str = datetime.now(_BEIJING_TZ).strftime("%Y-%m-%d")
object_key = f"alarms/{device_id}/{date_str}/{alarm_id}.jpg"
try:
@@ -344,6 +347,7 @@ class AlarmUploadWorker:
json=report_data,
headers=headers,
timeout=10,
proxies={"http": None, "https": None},
)
if response.status_code == 200:
@@ -392,7 +396,7 @@ class AlarmUploadWorker:
if retry_count > max_retry:
# 超过最大重试次数,写入死信队列
alarm_data["_dead_reason"] = error
alarm_data["_dead_at"] = datetime.now(timezone.utc).isoformat()
alarm_data["_dead_at"] = datetime.now(_BEIJING_TZ).isoformat()
dead_json = json.dumps(alarm_data, ensure_ascii=False)
self._redis.lpush(REDIS_KEY_ALARM_DEAD, dead_json)
self._stats["dead_lettered"] += 1

View File

@@ -14,6 +14,12 @@
import json
import logging
import os
import platform
import socket
# 禁用系统代理Clash 等代理工具会干扰 Redis TCP 长连接)
for key in ("http_proxy", "https_proxy", "HTTP_PROXY", "HTTPS_PROXY", "all_proxy", "ALL_PROXY"):
os.environ.pop(key, None)
import threading
import time
from datetime import datetime
@@ -30,6 +36,23 @@ from utils.version_control import get_version_control
logger = logging.getLogger(__name__)
def _build_keepalive_options():
"""构建 TCP keepalive 选项,适配 Linux/Windows"""
opts = {}
if platform.system() == "Linux":
# TCP_KEEPIDLE: 连接空闲 15s 后开始发送 keepalive 探测
# TCP_KEEPINTVL: 每次探测间隔 5s
# TCP_KEEPCNT: 连续 3 次探测失败则判定断连
opts = {
socket.TCP_KEEPIDLE: 15,
socket.TCP_KEEPINTVL: 5,
socket.TCP_KEEPCNT: 3,
}
# Windows 不支持 TCP_KEEPIDLE/KEEPINTVL/KEEPCNT
# 但 socket_keepalive=True 仍会启用默认 keepalive
return opts
# ==================== Redis Key 常量 ====================
# 云端 Redis Keys
@@ -206,6 +229,9 @@ class ConfigSyncManager:
socket_connect_timeout=10,
socket_timeout=10,
retry_on_timeout=True,
socket_keepalive=True,
socket_keepalive_options=_build_keepalive_options(),
health_check_interval=15,
)
self._cloud_redis.ping()
logger.info(f"云端 Redis 连接成功: {cfg.host}:{cfg.port}/{cfg.db}")
@@ -326,6 +352,11 @@ class ConfigSyncManager:
)
if not result:
# 无新消息时主动 PING 保活,防止云端 Redis 断开空闲连接
try:
self._cloud_redis.ping()
except Exception:
break # ping 失败说明连接已断,跳出内层循环触发重连
continue
for stream_name, messages in result:
@@ -540,6 +571,7 @@ class ConfigSyncManager:
camera_name=cam.get("camera_name", ""),
enabled=cam.get("enabled", True),
location=cam.get("location", ""),
area_id=cam.get("area_id"),
)
count += 1
except Exception as e:
@@ -640,6 +672,15 @@ class ConfigSyncManager:
removed += 1 + bind_count
logger.info(f"清理旧 ROI: {old_roi_id} (含 {bind_count} 条算法绑定)")
# 清理孤立的算法绑定ROI 仍存在但绑定已被删除的情况)
if incoming_bind_ids:
existing_bind_ids = self._db_manager.get_all_bind_ids()
for old_bind_id in existing_bind_ids:
if old_bind_id not in incoming_bind_ids:
self._db_manager.delete_roi_algo_bind(old_bind_id)
removed += 1
logger.info(f"清理孤立算法绑定: {old_bind_id}")
if removed > 0:
logger.info(f"旧数据清理完成: 共删除 {removed} 条过期记录")

View File

@@ -35,7 +35,7 @@ class NMSProcessor:
可选 GPU 加速 (torchvision.ops.nms)
"""
def __init__(self, nms_threshold: float = 0.45, use_gpu: bool = False):
def __init__(self, nms_threshold: float = 0.5, use_gpu: bool = False):
self.nms_threshold = nms_threshold
self.use_gpu = use_gpu and _HAS_TORCH
self._logger = get_logger("postprocessor")
@@ -162,7 +162,7 @@ class NMSProcessor:
boxes: np.ndarray,
scores: np.ndarray,
class_ids: np.ndarray,
conf_threshold: float = 0.5,
conf_threshold: float = 0.45,
max_output_size: int = 300
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
@@ -579,8 +579,8 @@ class PostProcessor:
"alert_cooldown": 300,
}
self.nms_threshold = config.get("nms_threshold", 0.45)
self.conf_threshold = config.get("conf_threshold", 0.5)
self.nms_threshold = config.get("nms_threshold", 0.5)
self.conf_threshold = config.get("conf_threshold", 0.45)
MAX_DETECTIONS = 8400
self._buffer_xyxy = np.zeros((MAX_DETECTIONS, 4), dtype=np.float32)
@@ -707,22 +707,24 @@ class PostProcessor:
batch_outputs: List[np.ndarray],
batch_size: int,
conf_threshold: Optional[float] = None,
nms_threshold: Optional[float] = None
nms_threshold: Optional[float] = None,
per_item_conf_thresholds: Optional[List[float]] = None,
) -> List[Tuple[np.ndarray, np.ndarray, np.ndarray]]:
"""
批量处理多 ROI 检测结果
Args:
batch_outputs: 模型输出 (可能是 [output] 或单个 batch 数组)
batch_size: batch 大小
conf_threshold: 置信度阈值
nms_threshold: NMS阈值
per_item_conf_thresholds: 每个 batch item 独立的置信度阈值列表,
长度必须等于 batch_size。传入时忽略 conf_threshold 参数。
Returns:
List of (检测框, 置信度, 类别ID) per ROI
"""
if conf_threshold is None:
conf_threshold = self.conf_threshold
default_conf = conf_threshold if conf_threshold is not None else self.conf_threshold
if nms_threshold is None:
nms_threshold = self.nms_threshold
@@ -793,11 +795,16 @@ class PostProcessor:
self._buffer_xyxy[:valid_count, 2] = boxes[:, 0] + boxes[:, 2] / 2
self._buffer_xyxy[:valid_count, 3] = boxes[:, 1] + boxes[:, 3] / 2
item_conf = (
per_item_conf_thresholds[batch_idx]
if per_item_conf_thresholds is not None
else default_conf
)
keep_boxes, keep_scores, keep_classes = nms_processor.process_with_confidence_filter(
self._buffer_xyxy[:valid_count],
scores_coarse.astype(np.float32),
class_ids,
conf_threshold
item_conf
)
results.append((keep_boxes, keep_scores, keep_classes))

View File

@@ -9,7 +9,7 @@
import json
import logging
import uuid
from datetime import datetime, timezone
from datetime import datetime, timezone, timedelta
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, field
@@ -29,7 +29,7 @@ class AlarmInfo:
device_id: str
scene_id: str
event_time: str # ISO8601
alarm_level: int # 1-4
alarm_level: int # 0紧急 1重要 2普通 3轻微
snapshot_b64: Optional[str] = None # Base64 编码的 JPEG 截图
algorithm_code: Optional[str] = None
confidence_score: Optional[float] = None
@@ -56,7 +56,7 @@ def generate_alarm_id(device_id: str) -> str:
生成告警ID
格式: edge_{device_id}_{YYYYMMDDHHmmss}_{6位uuid}
"""
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
timestamp = datetime.now(timezone(timedelta(hours=8))).strftime("%Y%m%d%H%M%S")
uid = uuid.uuid4().hex[:6]
return f"edge_{device_id}_{timestamp}_{uid}"

View File

@@ -16,6 +16,7 @@ from typing import Optional
import cv2
import numpy as np
import redis
import requests
from config.settings import get_settings, COSConfig
@@ -25,7 +26,7 @@ logger = logging.getLogger(__name__)
# Redis 常量
SNAP_REQUEST_STREAM = "edge_snap_request"
SNAP_CONSUMER_GROUP = "edge_snap_group"
SNAP_CONSUMER_NAME = "edge_snap_worker"
SNAP_CONSUMER_NAME_PREFIX = "edge_snap_worker"
SNAP_RESULT_KEY_PREFIX = "snap:result:"
SNAP_RESULT_TTL = 60 # 降级结果 key 60s 过期
@@ -53,6 +54,8 @@ class ScreenshotHandler:
self._settings = get_settings()
self._cos_client = None
self._cos_config: COSConfig = self._settings.cos
self._device_id = self._settings.mqtt.device_id
self._consumer_name = f"{SNAP_CONSUMER_NAME_PREFIX}_{self._device_id}"
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
@@ -78,7 +81,8 @@ class ScreenshotHandler:
daemon=True,
)
self._thread.start()
logger.info("[截图] 截图处理器已启动")
logger.info("[截图] 截图处理器已启动, consumer=%s, device_id=%s",
self._consumer_name, self._device_id)
def stop(self):
"""停止截图监听线程"""
@@ -121,6 +125,32 @@ class ScreenshotHandler:
else:
logger.error("[截图] 创建 consumer group 失败: %s", e)
# ==================== 重连 ====================
def _reconnect_cloud_redis(self):
"""重建云端 Redis 连接"""
try:
from core.config_sync import _build_keepalive_options
cfg = self._settings.cloud_redis
self._cloud_redis = redis.Redis(
host=cfg.host,
port=cfg.port,
db=cfg.db,
password=cfg.password,
decode_responses=cfg.decode_responses,
socket_connect_timeout=5,
socket_timeout=10,
retry_on_timeout=True,
socket_keepalive=True,
socket_keepalive_options=_build_keepalive_options(),
health_check_interval=15,
)
self._cloud_redis.ping()
logger.info("[截图] 云端 Redis 重连成功")
except Exception as e:
logger.warning("[截图] 云端 Redis 重连失败: %s", e)
self._cloud_redis = None
# ==================== 主循环 ====================
def _listen_loop(self):
@@ -132,12 +162,17 @@ class ScreenshotHandler:
try:
results = self._cloud_redis.xreadgroup(
SNAP_CONSUMER_GROUP,
SNAP_CONSUMER_NAME,
self._consumer_name,
{SNAP_REQUEST_STREAM: ">"},
count=1,
block=5000,
)
if not results:
# 无新消息时主动 PING 保活
try:
self._cloud_redis.ping()
except Exception:
raise redis.ConnectionError("ping failed")
continue
backoff = 5 # 重置退避
@@ -157,6 +192,14 @@ class ScreenshotHandler:
except Exception:
pass
except redis.ConnectionError as e:
if self._stop_event.is_set():
return
logger.warning("[截图] 云端 Redis 连接断开: %s, %ds 后重连...", e, backoff)
self._reconnect_cloud_redis()
self._stop_event.wait(backoff)
backoff = min(backoff * 2, max_backoff)
except Exception as e:
if self._stop_event.is_set():
return
@@ -181,6 +224,13 @@ class ScreenshotHandler:
})
return
# 多 Edge 设备隔离:如果消息指定了 device_id 且不是本设备,跳过
target_device_id = fields.get("device_id", "")
if target_device_id and target_device_id != self._device_id:
logger.debug("[截图] 跳过非本设备请求: target=%s, self=%s, request_id=%s",
target_device_id, self._device_id, request_id)
return
rtsp_url = fields.get("rtsp_url", "")
logger.info("[截图] 收到截图请求: request_id=%s, camera=%s, callback=%s",
@@ -326,7 +376,8 @@ class ScreenshotHandler:
if callback_url:
try:
url = callback_url.rstrip("/") + SNAP_CALLBACK_PATH
resp = requests.post(url, json=body, timeout=SNAP_CALLBACK_TIMEOUT)
resp = requests.post(url, json=body, timeout=SNAP_CALLBACK_TIMEOUT,
proxies={"http": None, "https": None})
if resp.status_code < 300:
logger.info("[截图] HTTP 回调成功: request_id=%s", request_id)
return

164
main.py
View File

@@ -12,6 +12,10 @@ import time
from datetime import datetime
from typing import Dict, Any, Optional, List, Tuple
# 禁用系统代理Clash 等代理工具会干扰 Redis TCP 长连接和 HTTP 请求)
for _key in ("http_proxy", "https_proxy", "HTTP_PROXY", "HTTPS_PROXY", "all_proxy", "ALL_PROXY"):
os.environ.pop(_key, None)
from config.settings import get_settings, Settings
from core.config_sync import get_config_sync_manager, ConfigSyncManager
from core.debug_http_server import start_debug_http_server
@@ -54,6 +58,7 @@ class EdgeInferenceService:
self._debug_reload_thread: Optional[threading.Thread] = None
self._debug_http_server = None
self._debug_http_thread: Optional[threading.Thread] = None
self._heartbeat_thread: Optional[threading.Thread] = None
self._processing_threads: Dict[str, threading.Thread] = {}
self._stop_event = threading.Event()
@@ -184,29 +189,31 @@ class EdgeInferenceService:
def _init_screenshot_handler(self):
"""初始化截图处理器"""
try:
# 优先从 config_manager 获取已有的云端 Redis 连接
cloud_redis = getattr(self._config_manager, '_cloud_redis', None)
# LOCAL 模式下 config_manager 不初始化云端 Redis需要独立创建
if cloud_redis is None:
try:
import redis
cfg = self._settings.cloud_redis
cloud_redis = redis.Redis(
host=cfg.host,
port=cfg.port,
db=cfg.db,
password=cfg.password,
decode_responses=cfg.decode_responses,
socket_connect_timeout=5,
socket_timeout=5,
retry_on_timeout=True,
)
cloud_redis.ping()
self._logger.info(f"截图处理器独立连接云端 Redis 成功: {cfg.host}:{cfg.port}/{cfg.db}")
except Exception as e:
self._logger.warning(f"截图处理器无法连接云端 Redis: {e}")
cloud_redis = None
# 截图处理器必须使用独立的 Redis 连接(不能与 config_sync 共用,
# 因为两者都做阻塞 XREAD/XREADGROUP共用连接会互相干扰
cloud_redis = None
try:
import redis
cfg = self._settings.cloud_redis
from core.config_sync import _build_keepalive_options
cloud_redis = redis.Redis(
host=cfg.host,
port=cfg.port,
db=cfg.db,
password=cfg.password,
decode_responses=cfg.decode_responses,
socket_connect_timeout=5,
socket_timeout=10,
retry_on_timeout=True,
socket_keepalive=True,
socket_keepalive_options=_build_keepalive_options(),
health_check_interval=15,
)
cloud_redis.ping()
self._logger.info(f"截图处理器独立连接云端 Redis 成功: {cfg.host}:{cfg.port}/{cfg.db}")
except Exception as e:
self._logger.warning(f"截图处理器无法连接云端 Redis: {e}")
cloud_redis = None
if cloud_redis and self._stream_manager:
self._screenshot_handler = ScreenshotHandler(
@@ -284,6 +291,62 @@ class EdgeInferenceService:
)
self._debug_http_thread.start()
def _start_heartbeat(self):
"""启动心跳守护线程,每 30 秒向云端上报设备状态"""
def worker():
import requests
base_url = self._settings.alarm_upload.cloud_api_url.rstrip("/")
wvp_url = self._settings.alarm_upload.wvp_api_url.rstrip("/") if self._settings.alarm_upload.wvp_api_url else ""
urls = [f"{base_url}/api/ai/device/heartbeat"]
if wvp_url:
urls.append(f"{wvp_url}/api/ai/device/heartbeat")
device_id = self._settings.mqtt.device_id
self._logger.info(f"[心跳] 守护线程已启动, 目标: {urls}, device_id={device_id}")
while not self._stop_event.is_set():
try:
start_time = self._performance_stats.get("start_time")
uptime = (datetime.now() - start_time).total_seconds() if start_time else 0
stream_count = len(self._stream_manager._streams) if self._stream_manager else 0
config_version = self._config_manager.config_version if self._config_manager else None
payload = {
"device_id": device_id,
"status": {
"uptime_seconds": int(uptime),
"frames_processed": self._performance_stats.get("total_frames_processed", 0),
"alerts_generated": self._performance_stats.get("total_alerts_generated", 0),
"stream_count": stream_count,
"config_version": config_version,
"stream_stats": {
"active_streams": stream_count,
},
},
}
for url in urls:
try:
resp = requests.post(url, json=payload, timeout=10,
proxies={"http": None, "https": None})
if resp.status_code == 200:
self._logger.debug(f"[心跳] {url} 上报成功")
else:
self._logger.warning(f"[心跳] {url} 上报失败: HTTP {resp.status_code}")
except Exception as e:
self._logger.warning(f"[心跳] {url} 上报异常: {e}")
except Exception as e:
self._logger.warning(f"[心跳] 上报异常: {e}")
self._stop_event.wait(30)
self._heartbeat_thread = threading.Thread(
target=worker,
name="HeartbeatWorker",
daemon=True,
)
self._heartbeat_thread.start()
def initialize(self):
"""初始化所有组件"""
self._logger.info("=" * 50)
@@ -301,6 +364,7 @@ class EdgeInferenceService:
self._init_screenshot_handler()
self._start_debug_reload_watcher()
self._start_debug_http_server()
self._start_heartbeat()
self._performance_stats["start_time"] = datetime.now()
@@ -313,7 +377,12 @@ class EdgeInferenceService:
)
self._logger.info("所有组件初始化完成")
def _get_camera_area_id(self, camera_id: str) -> Optional[int]:
"""获取摄像头的 area_id"""
cam = self._get_camera_config_by_id(camera_id)
return cam.area_id if cam else None
def _get_camera_ids_with_roi(self) -> set:
"""获取有ROI配置的摄像头ID集合
@@ -598,14 +667,24 @@ class EdgeInferenceService:
self._logger.debug(f"[推理诊断] batch_data shape={batch_data.shape}, outputs={shapes}, 耗时={inference_time_ms:.1f}ms")
batch_size = len(chunk)
# 按算法类型获取每个 item 的独立置信度阈值
per_item_conf = [
self._settings.inference.get_conf_threshold(item[2].algo_code)
for item in chunk
]
batch_results = self._postprocessor.batch_process_detections(
outputs,
batch_size,
conf_threshold=self._settings.inference.conf_threshold
per_item_conf_thresholds=per_item_conf,
)
total_detections = sum(len(r[0]) for r in batch_results)
self._logger.debug(f"[推理] batch_size={batch_size}, 总检测数={total_detections}, conf_thresh={self._settings.inference.conf_threshold}")
self._logger.debug(
f"[推理] batch_size={batch_size}, 总检测数={total_detections}, "
f"conf_thresholds={per_item_conf}"
)
for idx, (camera_id, roi, bind, frame, _, scale_info) in enumerate(chunk):
boxes, scores, class_ids = batch_results[idx]
@@ -753,12 +832,16 @@ class EdgeInferenceService:
self._camera_alert_cooldown[dedup_key] = now
self._performance_stats["total_alerts_generated"] += 1
# 获取算法的离岗开始时间
leave_start_time = None
if alert_type == "leave_post":
algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("leave_post")
if algo and hasattr(algo, '_leave_start_time') and algo._leave_start_time:
leave_start_time = algo._leave_start_time.isoformat()
# 获取算法的事件开始时间(泛化:支持所有算法类型)
first_frame_time = None
algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get(alert_type)
if algo:
# 各算法使用不同的内部变量名存储开始时间
for attr in ('_leave_start_time', '_parking_start_time', '_congestion_start_time', '_intrusion_start_time'):
val = getattr(algo, attr, None)
if val:
first_frame_time = val.isoformat()
break
from core.result_reporter import AlarmInfo, generate_alarm_id
alarm_info = AlarmInfo(
@@ -778,20 +861,17 @@ class EdgeInferenceService:
"bind_id": bind.bind_id,
"message": alert.get("message", ""),
"edge_node_id": self._settings.mqtt.device_id,
"first_frame_time": leave_start_time,
"first_frame_time": first_frame_time,
"vehicle_count": alert.get("vehicle_count"),
"area_id": self._get_camera_area_id(camera_id),
},
)
self._reporter.report_alarm(alarm_info, screenshot=frame.image)
# 回填 alarm_id 到算法实例(用于后续 resolve 追踪)
if alert_type == "leave_post":
algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("leave_post")
if algo and hasattr(algo, 'set_last_alarm_id'):
algo.set_last_alarm_id(alarm_info.alarm_id)
elif alert_type == "intrusion":
algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get("intrusion")
if algo and hasattr(algo, 'set_last_alarm_id'):
algo.set_last_alarm_id(alarm_info.alarm_id)
# 回填 alarm_id 到算法实例(用于后续 resolve 追踪,泛化支持所有算法类型
algo = self._algorithm_manager.algorithms.get(roi_id, {}).get(f"{roi_id}_{bind.bind_id}", {}).get(alert_type)
if algo and hasattr(algo, 'set_last_alarm_id'):
algo.set_last_alarm_id(alarm_info.alarm_id)
self._logger.info(
f"告警已生成: type={alert_type}, "