# AI 告警平台后端服务 基于 FastAPI 的 AI 告警接收与管理平台,支持 MQTT 告警订阅、WebSocket 实时推送、设备心跳监控。 ## 功能特性 - **MQTT 告警订阅**:自动订阅边缘设备告警消息(`edge/alert/#`) - **WebSocket 实时推送**:告警实时推送到前端 - **设备心跳监控**:监控边缘设备在线状态 - **REST API**:告警查询、处理、统计等完整接口 - **告警去重**:基于时间窗口的告警去重机制 ## 系统架构 ``` ┌─────────────────────────────────────────────────────────────┐ │ AI 告警平台架构 │ ├─────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────┐ MQTT ┌──────────────────────┐ │ │ │ ai_edge │──────────────→│ EMQX Broker │ │ │ │ 边缘推理 │ edge/alert/# │ │ │ │ └──────────────┘ └──────────┬───────────┘ │ │ │ │ │ MQTT Subscribe │ │ ▼ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ 本服务 (FastAPI) │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ MqttService │ │AlertService │ │DeviceService│ │ │ │ │ │ 消息订阅 │→ │ 告警处理 │ │ 心跳监控 │ │ │ │ │ └─────────────┘ └──────┬──────┘ └─────────────┘ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ WebSocket │← │ Database │ │ │ │ │ │ 实时推送 │ │ SQLite │ │ │ │ │ └──────┬──────┘ └─────────────┘ │ │ │ └─────────│────────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────┐ │ │ │ 前端/AIOT │ │ │ │ 主平台 │ │ │ └──────────────┘ │ └─────────────────────────────────────────────────────────────┘ ``` ## 技术栈 | 组件 | 技术 | |------|------| | Web 框架 | FastAPI + Uvicorn | | 消息队列 | MQTT (paho-mqtt) | | 实时推送 | WebSocket | | 数据库 | SQLite / MySQL | | ORM | SQLAlchemy | | 配置管理 | Pydantic Settings + YAML | ## 快速开始 ### 1. 安装依赖 ```bash pip install -r requirements.txt ``` ### 2. 配置服务 编辑 `config/config.yaml`: ```yaml app: name: "AI Alert Platform" debug: true host: "0.0.0.0" port: 8000 database: url: "sqlite:///./data/alerts.db" mqtt: enabled: true host: "127.0.0.1" port: 1883 username: "" password: "" topic_alert: "edge/alert/#" topic_heartbeat: "edge/alert/heartbeat/#" ``` ### 3. 启动服务 ```bash # 开发模式(自动重载) uvicorn app.main:app --reload --host 0.0.0.0 --port 8000 # 生产模式 python -m app.main ``` ### 4. 访问 API 文档 启动后访问:http://localhost:8000/docs ## 项目结构 ``` service/ ├── app/ │ ├── main.py # FastAPI 入口 + 路由 │ ├── config.py # 配置管理(YAML + 环境变量) │ ├── models.py # SQLAlchemy 数据模型 │ ├── schemas.py # Pydantic 请求/响应模型 │ └── services/ │ ├── mqtt_service.py # MQTT 订阅服务 │ ├── alert_service.py # 告警业务逻辑 │ └── storage.py # 存储服务 ├── config/ │ └── config.yaml # 配置文件 ├── data/ # SQLite 数据库 ├── logs/ # 日志文件 ├── docs/ # 文档 ├── requirements.txt └── README.md ``` ## API 接口 ### 告警管理 | 方法 | 路径 | 说明 | |------|------|------| | GET | `/api/v1/alerts` | 告警列表(分页、筛选) | | GET | `/api/v1/alerts/{id}` | 告警详情 | | PUT | `/api/v1/alerts/{id}/handle` | 处理告警 | | GET | `/api/v1/alerts/statistics` | 告警统计 | ### 设备管理 | 方法 | 路径 | 说明 | |------|------|------| | GET | `/api/v1/devices` | 设备列表 | | GET | `/api/v1/devices/{id}` | 设备详情 | | GET | `/api/v1/devices/statistics` | 设备统计 | ### WebSocket | 路径 | 说明 | |------|------| | `/ws/alerts` | 告警实时推送 | ### 健康检查 | 方法 | 路径 | 说明 | |------|------|------| | GET | `/health` | 服务健康状态 | ## MQTT 消息格式 ### 告警消息(边缘端发布) Topic: `edge/alert/{camera_id}/{roi_id}` ```json { "camera_id": "cam_001", "roi_id": "roi_001", "device_id": "edge_device_001", "alert_type": "leave_post", "algorithm": "YOLO", "confidence": 0.92, "duration_minutes": 15, "trigger_time": "2026-02-05T12:00:00", "message": "检测到离岗行为", "bbox": [100, 100, 300, 400] } ``` ### 心跳消息(边缘端发布) Topic: `edge/alert/heartbeat/{device_id}` ```json { "device_id": "edge_device_001", "status": "online", "uptime": 3600, "frames_processed": 10000, "alerts_generated": 5, "timestamp": "2026-02-05T12:00:00" } ``` ## 与 wvp-platform 集成 本服务与 wvp-platform(视频监控平台)配合使用: 1. **wvp-platform**:管理摄像头、配置 ROI 区域、推送算法配置 2. **ai_edge**:接收配置、执行推理、上报告警 3. **本服务**:接收告警、存储管理、实时推送 ## 环境要求 - Python 3.8+ - MQTT Broker(推荐 EMQX) ## License MIT