From b3cf544343a6c4d3e04599d62ecb579a98f27c9d Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Fri, 6 Feb 2026 16:39:53 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B3=A8=E5=86=8C=20aiot=20=E8=B7=AF?= =?UTF-8?q?=E7=94=B1=E5=B9=B6=E6=9B=B4=E6=96=B0=E4=B8=BB=E7=A8=8B=E5=BA=8F?= =?UTF-8?q?=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - main.py:注册 aiot_alarm 和 aiot_edge 路由,保留旧路由兼容 - config.py/alert_service.py/mqtt_service.py:同步更新配置和服务 - 添加 CLAUDE.md 项目说明文档 Co-Authored-By: Claude Opus 4.6 --- CLAUDE.md | 82 +++++++++++++++++++++++++++++++++++ app/config.py | 2 + app/main.py | 15 +++++++ app/services/alert_service.py | 62 ++++++++++++++++++++++++++ app/services/mqtt_service.py | 50 ++++++++++++++++----- 5 files changed, 200 insertions(+), 11 deletions(-) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..aa97a41 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,82 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +AI Alert Platform backend service built with FastAPI. Receives alerts from edge AI devices via MQTT and HTTP, stores snapshot images, pushes real-time updates over WebSocket, and exposes REST APIs for a Yudao (芋道) Vue 3 frontend. Optionally triggers async big-model analysis on alert snapshots. + +## Commands + +```bash +# Install dependencies +pip install -r requirements.txt + +# Run development server (starts at http://localhost:8000, auto-reload in debug mode) +python -m app.main + +# API docs: http://localhost:8000/docs + +# Production run with Gunicorn (Linux only) +gunicorn app.main:app -k uvicorn.workers.UvicornWorker -w 4 -b 0.0.0.0:8000 +``` + +Environment setup: copy `.env.example` to `.env` and configure. SQLite is the default database; set `DATABASE_URL` for MySQL in production. Set `MQTT_ENABLED=false` if no MQTT broker is available. + +No test suite exists yet. If adding tests, use pytest. + +**Root `main.py`** is a PyCharm template file, not the real entry point. Always use `python -m app.main`. + +## Architecture + +### Data Flow + +``` +Edge Devices ──MQTT──→ EMQX Broker ──subscribe──→ MQTTService ──→ AlertService ──→ DB + │ +Edge Devices ──HTTP POST──→ /api/v1/alerts ────────────→│ + ↓ + NotificationService ──WebSocket──→ Frontend +``` + +### Dual API Surface + +The app exposes two parallel API layers hitting the same services and database: + +1. **Native API** (`/api/v1/`) — endpoints defined inline in `app/main.py`. Standard REST responses. +2. **Yudao-compat API** (`/admin-api/`) — routers in `app/routers/yudao_alert.py` and `yudao_auth.py`. Wraps responses in `{"code": 0, "data": ..., "msg": ""}` format expected by the Yudao Vue frontend. Auth/permissions handled by `app/yudao_compat.py` (stub in dev mode: `DEV_MODE=true` skips token validation, returns mock admin). + +### Services (Global Singletons) + +All services are instantiated as module-level singletons, not injected via FastAPI `Depends()`. Access them via factory functions (`get_alert_service()`, `get_mqtt_service()`, etc.) or import the global directly. + +- **AlertService** (`app/services/alert_service.py`) — CRUD, filtering/pagination, statistics, AI analysis update. Handles both HTTP (`create_alert`) and MQTT (`create_alert_from_mqtt`) creation paths. Alert numbers: `ALT` + timestamp + uuid fragment. +- **MQTTService** (`app/services/mqtt_service.py`) — paho-mqtt client subscribing to `edge/alert/#`. Routes messages to registered callbacks for alerts vs heartbeats. Compatible with paho-mqtt 1.x and 2.x APIs. +- **DeviceService** (`app/services/device_service.py`) — Tracks edge device status from MQTT heartbeats. In-memory cache + DB persistence. Marks devices offline after 90s without heartbeat. +- **NotificationService** (`app/services/notification_service.py`) — WebSocket connection manager + broadcast. Bridges sync MQTT callbacks to async WebSocket sends via `asyncio.run_coroutine_threadsafe()`. +- **OSSStorage** (`app/services/oss_storage.py`) — Image storage. Currently local-only (`uploads/` dir); Aliyun OSS stubbed but not implemented. +- **AIAnalyzer** (`app/services/ai_analyzer.py`) — Async httpx client for optional big-model analysis. Fire-and-forget via `asyncio.create_task()`. + +### Lifecycle (`app/main.py` lifespan) + +Startup: init DB → set async event loop on NotificationService → register MQTT handlers → start MQTT. +Shutdown: stop MQTT. + +### Database + +SQLite at `data/alert_platform.db` (auto-created). Three ORM models in `app/models.py`: + +- **Alert** — main table. Enums: `AlertStatus` (pending/confirmed/ignored/resolved/dispatched), `AlertLevel` (low/medium/high/critical). Indexed on alert_no, camera_id, status, trigger_time. +- **EdgeDevice** — device status and heartbeat tracking. Enum: `DeviceStatus` (online/offline/error). +- **WorkOrder** — framework exists (model + relationship to Alert) but no API/service layer yet. + +Sessions obtained via `get_session()`. Services manage their own session lifecycle with try/finally. + +### Config + +`app/config.py` — dataclass-based config loaded from `.env` via `os.getenv()`. Sections: `DatabaseConfig`, `OSSConfig`, `AppConfig` (includes `dev_mode`), `AIModelConfig`, `MQTTConfig`. Global `settings` instance created at module load. + +### Key MQTT Topics + +- Alert: `edge/alert/{camera_id}/{roi_id}` — JSON with camera_id, roi_id, alert_type, confidence, etc. +- Heartbeat: `edge/alert/heartbeat/{device_id}` — JSON with device_id, status, uptime, counters. diff --git a/app/config.py b/app/config.py index 853c83b..8108e35 100644 --- a/app/config.py +++ b/app/config.py @@ -30,6 +30,7 @@ class AppConfig: host: str = "0.0.0.0" port: int = 8000 debug: bool = True + dev_mode: bool = True # 开发模式:跳过认证验证,返回超级管理员权限 @dataclass @@ -82,6 +83,7 @@ def load_settings() -> Settings: host=os.getenv("APP_HOST", "0.0.0.0"), port=int(os.getenv("APP_PORT", "8000")), debug=os.getenv("DEBUG", "true").lower() == "true", + dev_mode=os.getenv("DEV_MODE", "true").lower() == "true", ), ai_model=AIModelConfig( endpoint=os.getenv("AI_MODEL_ENDPOINT", ""), diff --git a/app/main.py b/app/main.py index 1d66f74..146c4e0 100644 --- a/app/main.py +++ b/app/main.py @@ -26,6 +26,8 @@ from app.services.mqtt_service import get_mqtt_service from app.services.notification_service import get_notification_service from app.services.device_service import get_device_service from app.utils.logger import logger +from app.routers import yudao_alert_router, yudao_auth_router, yudao_aiot_alarm_router, yudao_aiot_edge_router +from app.yudao_compat import yudao_exception_handler import json @@ -100,6 +102,19 @@ app.add_middleware( allow_headers=["*"], ) +# ==================== 芋道兼容路由 ==================== +# 提供与芋道主平台一致的接口规范,便于前端统一对接 +app.include_router(yudao_auth_router) +app.include_router(yudao_alert_router) + +# ==================== AIoT 统一路由 ==================== +# aiot 命名空间下的新路由,与旧路由并存 +app.include_router(yudao_aiot_alarm_router) +app.include_router(yudao_aiot_edge_router) + +# 注册芋道格式异常处理器 +app.add_exception_handler(HTTPException, yudao_exception_handler) + def get_alert_svc(): return alert_service diff --git a/app/services/alert_service.py b/app/services/alert_service.py index 6da10e3..8024b4e 100644 --- a/app/services/alert_service.py +++ b/app/services/alert_service.py @@ -319,6 +319,68 @@ class AlertService: finally: db.close() + def get_camera_alert_summary( + self, + page: int = 1, + page_size: int = 10, + ) -> dict: + """以摄像头为维度获取告警汇总""" + from sqlalchemy import func + db = get_session() + try: + # 按摄像头分组统计总数和最近时间 + query = db.query( + Alert.camera_id, + func.count(Alert.id).label("total_count"), + func.max(Alert.created_at).label("last_alert_time"), + ).group_by(Alert.camera_id) + + # 总数 + total = query.count() + + # 分页 + results = ( + query.order_by(func.count(Alert.id).desc()) + .offset((page - 1) * page_size) + .limit(page_size) + .all() + ) + + summary_list = [] + for row in results: + # 获取该摄像头待处理告警数量 + pending_count = ( + db.query(Alert) + .filter(Alert.camera_id == row.camera_id) + .filter(Alert.status == AlertStatus.PENDING) + .count() + ) + + # 获取该摄像头最新的一条告警 + latest_alert = ( + db.query(Alert) + .filter(Alert.camera_id == row.camera_id) + .order_by(Alert.created_at.desc()) + .first() + ) + + summary_list.append({ + "cameraId": row.camera_id, + "cameraName": row.camera_id, # TODO: 从设备服务获取摄像头名称 + "totalCount": row.total_count, + "pendingCount": pending_count, + "lastAlertTime": row.last_alert_time.isoformat() if row.last_alert_time else None, + "lastAlertType": latest_alert.alert_type if latest_alert else None, + "lastAlertTypeName": latest_alert.alert_type if latest_alert else None, # 前端会映射 + }) + + return { + "list": summary_list, + "total": total, + } + finally: + db.close() + # 全局单例 alert_service = AlertService() diff --git a/app/services/mqtt_service.py b/app/services/mqtt_service.py index e19510a..3154dce 100644 --- a/app/services/mqtt_service.py +++ b/app/services/mqtt_service.py @@ -20,6 +20,7 @@ class MQTTService: self._client: Optional[mqtt.Client] = None self._connected = False self._running = False + self._use_v2_callback = False # paho-mqtt 版本标记 self._lock = threading.Lock() # 回调函数 @@ -58,11 +59,23 @@ class MQTTService: return try: - self._client = mqtt.Client( - client_id=settings.mqtt.client_id, - protocol=mqtt.MQTTv5, - callback_api_version=mqtt.CallbackAPIVersion.VERSION2 - ) + # 兼容 paho-mqtt 1.x 和 2.x 版本 + try: + # paho-mqtt 2.0+ 新 API + self._client = mqtt.Client( + client_id=settings.mqtt.client_id, + protocol=mqtt.MQTTv5, + callback_api_version=mqtt.CallbackAPIVersion.VERSION2 + ) + self._use_v2_callback = True + except AttributeError: + # paho-mqtt 1.x 旧 API + self._client = mqtt.Client( + client_id=settings.mqtt.client_id, + protocol=mqtt.MQTTv5 + ) + self._use_v2_callback = False + logger.info("使用 paho-mqtt 1.x 兼容模式") # 设置回调 self._client.on_connect = self._on_connect @@ -107,9 +120,17 @@ class MQTTService: self._client.disconnect() logger.info("MQTT 服务已停止") - def _on_connect(self, client, userdata, flags, reason_code, properties): - """连接回调""" - if reason_code == 0: + def _on_connect(self, client, userdata, *args): + """连接回调 (兼容 1.x 和 2.x)""" + # 1.x: (client, userdata, flags, rc) + # 2.x: (client, userdata, connect_flags, reason_code, properties) + if args: + reason_code = args[-2] if len(args) >= 2 else args[-1] + rc = reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', reason_code) + else: + rc = -1 + + if rc == 0: self._connected = True logger.info("MQTT 连接成功") @@ -123,10 +144,17 @@ class MQTTService: self._connected = False logger.error(f"MQTT 连接失败: {reason_code}") - def _on_disconnect(self, client, userdata, reason_code, properties): - """断开连接回调""" + def _on_disconnect(self, client, userdata, *args): + """断开连接回调 (兼容 1.x 和 2.x)""" + # 1.x: (client, userdata, rc) + # 2.x: (client, userdata, disconnect_flags, reason_code, properties) self._connected = False - logger.warning(f"MQTT 连接断开: {reason_code}") + if args: + reason_code = args[-2] if len(args) >= 2 else args[0] + rc = reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', reason_code) + logger.warning(f"MQTT 连接断开: {rc}") + else: + logger.warning("MQTT 连接断开") def _on_message(self, client, userdata, msg: mqtt.MQTTMessage): """消息回调"""