From 9f4cea08106e5b7355a4ec71cd992a2d59134443 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Tue, 10 Feb 2026 15:22:01 +0800 Subject: [PATCH] =?UTF-8?q?feat(aiot):=20=E8=BE=B9=E7=BC=98=E5=91=8A?= =?UTF-8?q?=E8=AD=A6HTTP=E4=B8=8A=E6=8A=A5=20+=20=E7=A7=BB=E9=99=A4?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E4=B8=AD=E8=BD=AC=E5=B1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 edge/report 端点接收边缘端HTTP告警上报 - alarm_event_service 新增 create_from_edge_report 幂等创建 - schemas 新增 EdgeAlarmReport 模型 - 移除 config_service/redis_service/yudao_aiot_config 配置中转 - MQTT 服务标记废弃,告警上报改为HTTP+COS - config 新增 COS/Redis 配置项 - requirements 新增 redis 依赖 Co-Authored-By: Claude Opus 4.6 --- .env.example | 18 +- app/config.py | 25 ++- app/main.py | 59 +----- app/routers/yudao_aiot_alarm.py | 34 ++++ app/schemas.py | 16 ++ app/services/alarm_event_service.py | 91 ++++++++++ app/services/mqtt_service.py | 270 +++------------------------- requirements.txt | 1 + 8 files changed, 207 insertions(+), 307 deletions(-) diff --git a/.env.example b/.env.example index 5cf4508..b5bcc1d 100644 --- a/.env.example +++ b/.env.example @@ -17,13 +17,17 @@ APP_PORT=8000 DEBUG=true DEV_MODE=true -# MQTT 配置 -MQTT_ENABLED=true -MQTT_BROKER_HOST=localhost -MQTT_BROKER_PORT=1883 -MQTT_USERNAME= -MQTT_PASSWORD= - # 大模型配置(可选) AI_MODEL_ENDPOINT=http://localhost:8001 AI_MODEL_API_KEY=your_api_key + +# Redis 配置(配置下发三层权威模型 - 云端层) +REDIS_ENABLED=true +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_PASSWORD= +REDIS_DB=0 +REDIS_MAX_CONNECTIONS=50 + +# 边缘设备认证 Token(边缘端上报告警时使用) +# EDGE_TOKEN=your_edge_device_token diff --git a/app/config.py b/app/config.py index f89813b..eb30db9 100644 --- a/app/config.py +++ b/app/config.py @@ -45,7 +45,7 @@ class AIModelConfig: @dataclass class MQTTConfig: - """MQTT 配置""" + """MQTT 配置 (已废弃 - 告警上报已改为 HTTP + COS)""" broker_host: str = "localhost" broker_port: int = 1883 client_id: str = "alert_platform" @@ -54,6 +54,18 @@ class MQTTConfig: alert_topic: str = "edge/alert/#" heartbeat_topic: str = "edge/alert/heartbeat/#" qos: int = 1 + enabled: bool = False # 默认禁用 + + +@dataclass +class RedisConfig: + """Redis 配置""" + host: str = "localhost" + port: int = 6379 + password: str = "" + db: int = 0 + max_connections: int = 50 + decode_responses: bool = True enabled: bool = True @@ -64,6 +76,7 @@ class Settings(BaseModel): app: AppConfig = AppConfig() ai_model: AIModelConfig = AIModelConfig() mqtt: MQTTConfig = MQTTConfig() + redis: RedisConfig = RedisConfig() def load_settings() -> Settings: @@ -104,7 +117,15 @@ def load_settings() -> Settings: alert_topic=os.getenv("MQTT_ALERT_TOPIC", "edge/alert/#"), heartbeat_topic=os.getenv("MQTT_HEARTBEAT_TOPIC", "edge/alert/heartbeat/#"), qos=int(os.getenv("MQTT_QOS", "1")), - enabled=os.getenv("MQTT_ENABLED", "true").lower() == "true", + enabled=os.getenv("MQTT_ENABLED", "false").lower() == "true", + ), + redis=RedisConfig( + host=os.getenv("REDIS_HOST", "localhost"), + port=int(os.getenv("REDIS_PORT", "6379")), + password=os.getenv("REDIS_PASSWORD", ""), + db=int(os.getenv("REDIS_DB", "0")), + max_connections=int(os.getenv("REDIS_MAX_CONNECTIONS", "50")), + enabled=os.getenv("REDIS_ENABLED", "true").lower() == "true", ), ) diff --git a/app/main.py b/app/main.py index d08d61c..80a5e4e 100644 --- a/app/main.py +++ b/app/main.py @@ -23,7 +23,6 @@ from app.schemas import ( from app.services.alert_service import alert_service, get_alert_service from app.services.alarm_event_service import alarm_event_service, get_alarm_event_service from app.services.ai_analyzer import trigger_async_analysis -from app.services.mqtt_service import get_mqtt_service from app.services.notification_service import get_notification_service from app.services.device_service import get_device_service from app.utils.logger import logger @@ -33,44 +32,10 @@ import json # 全局服务实例 -mqtt_service = get_mqtt_service() notification_service = get_notification_service() device_service = get_device_service() -def handle_mqtt_alert(payload: dict): - """处理 MQTT 告警消息(双写:旧表 + 新表)""" - try: - # 1. 写旧表(保持兼容) - alert = alert_service.create_alert_from_mqtt(payload) - if alert: - logger.info(f"MQTT 告警写入旧表: {alert.alert_no}") - - # 2. 写新表 - alarm = alarm_event_service.create_from_mqtt(payload) - - # 3. WebSocket 通知(优先使用新表数据) - if alarm: - notification_service.notify_sync("new_alert", alarm.to_dict()) - logger.info(f"MQTT 告警已处理并推送: {alarm.alarm_id}") - elif alert: - notification_service.notify_sync("new_alert", alert.to_dict()) - logger.info(f"MQTT 告警已处理并推送(旧表): {alert.alert_no}") - except Exception as e: - logger.error(f"处理 MQTT 告警失败: {e}") - - -def handle_mqtt_heartbeat(payload: dict): - """处理 MQTT 心跳消息""" - try: - device = device_service.handle_heartbeat(payload) - if device: - # 通过 WebSocket 推送设备状态 - notification_service.notify_sync("device_status", device.to_dict()) - except Exception as e: - logger.error(f"处理 MQTT 心跳失败: {e}") - - @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" @@ -78,23 +43,15 @@ async def lifespan(app: FastAPI): init_db() logger.info("数据库初始化完成") - # 设置事件循环(用于从 MQTT 回调调用异步方法) + # 设置事件循环(用于从同步代码调用异步方法,如 WebSocket 通知) loop = asyncio.get_event_loop() notification_service.set_event_loop(loop) - # 注册 MQTT 处理器 - mqtt_service.register_alert_handler(handle_mqtt_alert) - mqtt_service.register_heartbeat_handler(handle_mqtt_heartbeat) - - # 启动 MQTT 服务 - mqtt_service.start() - logger.info("AI 告警平台启动完成") yield # 关闭 - mqtt_service.stop() logger.info("AI 告警平台已关闭") @@ -149,13 +106,9 @@ async def health_check(): except Exception as e: db_status = f"unhealthy: {e}" - mqtt_stats = mqtt_service.get_statistics() - mqtt_status = "connected" if mqtt_stats["connected"] else "disconnected" - return HealthResponse( - status="healthy" if db_status == "healthy" and mqtt_stats["connected"] else "degraded", + status="healthy" if db_status == "healthy" else "degraded", database=db_status, - mqtt=mqtt_status, websocket_connections=notification_service.manager.connection_count, ) @@ -333,14 +286,6 @@ async def get_device( return DeviceResponse(**device.to_dict()) -# ==================== MQTT 状态端点 ==================== - -@app.get("/api/v1/mqtt/statistics") -async def get_mqtt_statistics(): - """获取 MQTT 服务统计""" - return mqtt_service.get_statistics() - - if __name__ == "__main__": import uvicorn uvicorn.run( diff --git a/app/routers/yudao_aiot_alarm.py b/app/routers/yudao_aiot_alarm.py index 5ac99c1..0fbd211 100644 --- a/app/routers/yudao_aiot_alarm.py +++ b/app/routers/yudao_aiot_alarm.py @@ -18,7 +18,9 @@ from datetime import datetime from app.yudao_compat import YudaoResponse, get_current_user from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService +from app.services.notification_service import get_notification_service from app.services.oss_storage import get_oss_storage +from app.schemas import EdgeAlarmReport router = APIRouter(prefix="/admin-api/aiot/alarm", tags=["AIoT-告警"]) @@ -184,6 +186,38 @@ async def get_device_summary_page( ) +# ==================== 边缘端告警上报 ==================== + +@router.post("/edge/report") +async def edge_alarm_report( + report: EdgeAlarmReport, + service: AlarmEventService = Depends(get_alarm_event_service), + current_user: dict = Depends(get_current_user), +): + """ + 边缘端告警上报接口 + + 边缘设备通过 HTTP POST 上报告警元数据,截图已预先上传到 COS。 + 支持幂等:通过 alarm_id 判断是否已存在。 + """ + alarm = service.create_from_edge_report(report.model_dump()) + + if alarm is None: + return YudaoResponse.error(500, "告警创建失败") + + # WebSocket 通知 + try: + notification_svc = get_notification_service() + notification_svc.notify_sync("new_alert", alarm.to_dict()) + except Exception: + pass # WebSocket 通知失败不影响主流程 + + return YudaoResponse.success({ + "alarmId": alarm.alarm_id, + "created": True, + }) + + # ==================== 辅助函数 ==================== def _get_alarm_type_name(alarm_type: Optional[str]) -> str: diff --git a/app/schemas.py b/app/schemas.py index ee37d7d..6a1c3a6 100644 --- a/app/schemas.py +++ b/app/schemas.py @@ -110,3 +110,19 @@ class HealthResponse(BaseModel): database: str mqtt: Optional[str] = None websocket_connections: Optional[int] = None + + +# ==================== 边缘告警上报 ==================== + +class EdgeAlarmReport(BaseModel): + """边缘端告警上报请求体""" + alarm_id: str = Field(..., max_length=64, description="边缘端生成的告警ID (edge_{device_id}_{timestamp}_{uuid})") + alarm_type: str = Field(..., max_length=32, description="告警类型: leave_post/intrusion/crowd 等") + device_id: str = Field(..., max_length=64, description="摄像头/设备ID") + scene_id: Optional[str] = Field(None, max_length=64, description="场景/ROI ID") + event_time: str = Field(..., description="事件发生时间 ISO8601") + alarm_level: int = Field(2, ge=1, le=4, description="告警级别: 1提醒 2一般 3严重 4紧急") + snapshot_url: Optional[str] = Field(None, max_length=512, description="截图 COS object_key") + algorithm_code: Optional[str] = Field(None, max_length=64, description="算法编码") + confidence_score: Optional[float] = Field(None, ge=0, le=1, description="置信度 0-1") + ext_data: Optional[Dict[str, Any]] = Field(None, description="扩展数据 (bbox/target_class 等)") diff --git a/app/services/alarm_event_service.py b/app/services/alarm_event_service.py index 73b1879..c627a59 100644 --- a/app/services/alarm_event_service.py +++ b/app/services/alarm_event_service.py @@ -126,6 +126,97 @@ class AlarmEventService: finally: db.close() + def create_from_edge_report(self, data: Dict[str, Any]) -> Optional[AlarmEvent]: + """ + 从边缘端 HTTP 上报创建告警事件 + + 边缘端通过 POST /admin-api/aiot/alarm/edge/report 上报告警。 + 使用边缘端生成的 alarm_id,支持幂等(重复 alarm_id 跳过)。 + + Args: + data: 边缘端上报数据,字段与 alarm_event 表对齐 + + Returns: + AlarmEvent 或 None + """ + db = get_session() + try: + alarm_id = data.get("alarm_id") + if not alarm_id: + logger.error("边缘上报缺少 alarm_id") + return None + + # 幂等校验:alarm_id 已存在则跳过 + existing = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first() + if existing: + logger.info(f"告警已存在,跳过: {alarm_id}") + return existing + + # 解析时间 + event_time_str = data.get("event_time") + if event_time_str: + try: + event_time = datetime.fromisoformat(event_time_str.replace("Z", "+00:00")) + except ValueError: + event_time = datetime.now(timezone.utc) + else: + event_time = datetime.now(timezone.utc) + + # 置信度 + confidence = data.get("confidence_score") + if confidence is not None: + confidence = float(confidence) + if confidence > 1: + confidence = confidence / 100.0 + + alarm_type = data.get("alarm_type", "unknown") + alarm_level = data.get("alarm_level") + if alarm_level is None: + # 从 ext_data 取 duration_ms + ext_data = data.get("ext_data") or {} + duration_ms = ext_data.get("duration_ms") + alarm_level = _determine_alarm_level(alarm_type, confidence or 0, duration_ms) + + alarm = AlarmEvent( + alarm_id=alarm_id, + alarm_type=alarm_type, + algorithm_code=data.get("algorithm_code"), + device_id=data.get("device_id", "unknown"), + scene_id=data.get("scene_id"), + event_time=event_time, + alarm_level=alarm_level, + confidence_score=confidence, + alarm_status="NEW", + handle_status="UNHANDLED", + snapshot_url=data.get("snapshot_url"), # COS object_key + edge_node_id=data.get("ext_data", {}).get("edge_node_id") if data.get("ext_data") else None, + ) + + db.add(alarm) + + # 写入扩展表 + ext_data = data.get("ext_data") + if ext_data: + ext = AlarmEventExt( + alarm_id=alarm_id, + ext_type="EDGE_HTTP", + ext_data=ext_data, + ) + db.add(ext) + + db.commit() + db.refresh(alarm) + + logger.info(f"边缘端告警创建成功: {alarm_id}, type={alarm_type}, device={data.get('device_id')}") + return alarm + + except Exception as e: + db.rollback() + logger.error(f"创建边缘端告警失败: {e}") + return None + finally: + db.close() + def create_from_http(self, data: Dict[str, Any], snapshot_data: Optional[bytes] = None) -> Optional[AlarmEvent]: """从 HTTP 请求创建告警事件""" db = get_session() diff --git a/app/services/mqtt_service.py b/app/services/mqtt_service.py index 5de890c..5baabf0 100644 --- a/app/services/mqtt_service.py +++ b/app/services/mqtt_service.py @@ -1,265 +1,53 @@ """ -MQTT 订阅服务 -订阅边缘端告警和心跳消息 +MQTT 服务 - 已废弃 + +告警上报已改为 HTTP + COS 方案(边缘端直传)。 +此文件保留为空壳,避免其他模块 import 报错。 +后续版本将彻底删除此文件。 """ -import json -import threading -import uuid -from datetime import datetime, timezone -from typing import Callable, Dict, Any, Optional, List -import paho.mqtt.client as mqtt - -from app.config import settings -from app.utils.logger import logger +from typing import Dict, Any, Optional class MQTTService: - """MQTT 订阅服务""" + """MQTT 服务 (已废弃,保留空壳兼容旧代码)""" def __init__(self): - self._client: Optional[mqtt.Client] = None - self._connected = False - self._running = False - self._use_v2_callback = False # paho-mqtt 版本标记 - self._lock = threading.Lock() + pass - # 回调函数 - self._alert_handlers: List[Callable[[Dict[str, Any]], None]] = [] - self._heartbeat_handlers: List[Callable[[Dict[str, Any]], None]] = [] + @property + def is_connected(self) -> bool: + return False - # 统计 - self._stats = { + def register_alert_handler(self, handler): + pass + + def register_heartbeat_handler(self, handler): + pass + + def start(self): + pass + + def stop(self): + pass + + def get_statistics(self) -> Dict[str, Any]: + return { "messages_received": 0, "alerts_received": 0, "heartbeats_received": 0, "errors": 0, + "connected": False, + "running": False, + "deprecated": True, } - @property - def is_connected(self) -> bool: - return self._connected - def register_alert_handler(self, handler: Callable[[Dict[str, Any]], None]): - """注册告警处理回调""" - self._alert_handlers.append(handler) - logger.info(f"已注册告警处理器: {handler.__name__}") - - def register_heartbeat_handler(self, handler: Callable[[Dict[str, Any]], None]): - """注册心跳处理回调""" - self._heartbeat_handlers.append(handler) - logger.info(f"已注册心跳处理器: {handler.__name__}") - - def start(self): - """启动 MQTT 服务""" - if not settings.mqtt.enabled: - logger.info("MQTT 服务已禁用") - return - - if self._running: - return - - try: - # 给 client_id 添加随机后缀,防止多实例 client_id 冲突导致反复踢连接 - unique_client_id = f"{settings.mqtt.client_id}_{uuid.uuid4().hex[:8]}" - - # 兼容 paho-mqtt 1.x 和 2.x 版本 - try: - # paho-mqtt 2.0+ 新 API - self._client = mqtt.Client( - client_id=unique_client_id, - protocol=mqtt.MQTTv311, - callback_api_version=mqtt.CallbackAPIVersion.VERSION2 - ) - self._use_v2_callback = True - except AttributeError: - # paho-mqtt 1.x 旧 API - self._client = mqtt.Client( - client_id=unique_client_id, - protocol=mqtt.MQTTv311 - ) - self._use_v2_callback = False - logger.info("使用 paho-mqtt 1.x 兼容模式") - - # 设置回调 - self._client.on_connect = self._on_connect - self._client.on_disconnect = self._on_disconnect - self._client.on_message = self._on_message - - # 设置认证(如果有) - if settings.mqtt.username: - self._client.username_pw_set( - settings.mqtt.username, - settings.mqtt.password - ) - - # 连接 - self._client.connect( - settings.mqtt.broker_host, - settings.mqtt.broker_port, - 60 - ) - - # 启动循环 - self._client.loop_start() - self._running = True - - logger.info( - f"MQTT 服务启动: {settings.mqtt.broker_host}:{settings.mqtt.broker_port}" - ) - - except Exception as e: - logger.error(f"MQTT 服务启动失败: {e}") - self._running = False - - def stop(self): - """停止 MQTT 服务""" - if not self._running: - return - - self._running = False - - if self._client: - self._client.loop_stop() - self._client.disconnect() - logger.info("MQTT 服务已停止") - - def _on_connect(self, client, userdata, *args): - """连接回调 (兼容 1.x 和 2.x)""" - # 1.x: (client, userdata, flags, rc) - # 2.x: (client, userdata, connect_flags, reason_code, properties) - rc = 0 - if args: - # 取倒数第二个参数(1.x 的 rc 或 2.x 的 reason_code) - if len(args) >= 2: - reason_code = args[-2] - else: - reason_code = args[0] - rc = reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', 0) - - if rc == 0: - self._connected = True - logger.info("MQTT 连接成功") - - # 订阅告警主题 - client.subscribe(settings.mqtt.alert_topic, settings.mqtt.qos) - logger.info(f"已订阅告警主题: {settings.mqtt.alert_topic}") - - # 心跳主题通常是 edge/alert/heartbeat/# 但它已经被 edge/alert/# 包含 - # 所以不需要单独订阅 - else: - self._connected = False - logger.error(f"MQTT 连接失败: {reason_code}") - - def _on_disconnect(self, client, userdata, *args): - """断开连接回调 (兼容 1.x 和 2.x)""" - # 1.x: (client, userdata, rc) - # 2.x: (client, userdata, disconnect_flags, reason_code, properties) - self._connected = False - rc = 0 - if args: - if len(args) >= 2: - reason_code = args[-2] - else: - reason_code = args[0] - rc = reason_code if isinstance(reason_code, int) else getattr(reason_code, 'value', 0) - if rc != 0: - logger.warning(f"MQTT 连接异常断开: rc={rc}") - else: - logger.info("MQTT 连接断开") - - def _on_message(self, client, userdata, msg: mqtt.MQTTMessage): - """消息回调""" - with self._lock: - self._stats["messages_received"] += 1 - - try: - topic = msg.topic - payload = json.loads(msg.payload.decode("utf-8")) - - logger.debug(f"收到 MQTT 消息: topic={topic}") - - # 判断消息类型 - if "/heartbeat/" in topic: - self._handle_heartbeat(topic, payload) - else: - self._handle_alert(topic, payload) - - except json.JSONDecodeError as e: - logger.error(f"MQTT 消息解析失败: {e}") - with self._lock: - self._stats["errors"] += 1 - except Exception as e: - logger.error(f"MQTT 消息处理失败: {e}") - with self._lock: - self._stats["errors"] += 1 - - def _handle_alert(self, topic: str, payload: Dict[str, Any]): - """处理告警消息""" - with self._lock: - self._stats["alerts_received"] += 1 - - # 从 topic 解析 camera_id 和 roi_id - # topic 格式: edge/alert/{camera_id}/{roi_id} - parts = topic.split("/") - if len(parts) >= 4: - camera_id = parts[2] - roi_id = parts[3] if len(parts) > 3 else None - - # 确保 payload 中有这些字段 - payload.setdefault("camera_id", camera_id) - payload.setdefault("roi_id", roi_id) - - logger.info( - f"收到告警: camera={payload.get('camera_id')}, " - f"type={payload.get('alert_type')}, " - f"confidence={payload.get('confidence')}" - ) - - # 调用所有注册的处理器 - for handler in self._alert_handlers: - try: - handler(payload) - except Exception as e: - logger.error(f"告警处理器执行失败: {e}") - - def _handle_heartbeat(self, topic: str, payload: Dict[str, Any]): - """处理心跳消息""" - with self._lock: - self._stats["heartbeats_received"] += 1 - - # 从 topic 解析 device_id - # topic 格式: edge/alert/heartbeat/{device_id} - parts = topic.split("/") - if len(parts) >= 4: - device_id = parts[3] - payload.setdefault("device_id", device_id) - - logger.debug(f"收到心跳: device={payload.get('device_id')}") - - # 调用所有注册的处理器 - for handler in self._heartbeat_handlers: - try: - handler(payload) - except Exception as e: - logger.error(f"心跳处理器执行失败: {e}") - - def get_statistics(self) -> Dict[str, Any]: - """获取统计信息""" - with self._lock: - return { - **self._stats, - "connected": self._connected, - "running": self._running, - } - - -# 全局单例 _mqtt_service: Optional[MQTTService] = None def get_mqtt_service() -> MQTTService: - """获取 MQTT 服务单例""" + """获取 MQTT 服务单例 (已废弃)""" global _mqtt_service if _mqtt_service is None: _mqtt_service = MQTTService() diff --git a/requirements.txt b/requirements.txt index b3a874a..1169390 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ httpx==0.26.0 paho-mqtt==2.1.0 python-dotenv==1.0.1 websockets==12.0 +redis>=5.0.0