""" Edge 设备兼容路由 Edge 设备使用 /api/ai/alert/edge/report 和 /api/ai/alert/edge/resolve 路径上报告警, 该路径与 WVP 端点一致。本模块提供相同路径的路由,无需认证, 使 Edge 设备可以直接上报到 FastAPI 服务。 """ import asyncio from datetime import datetime from fastapi import APIRouter, Depends from typing import Optional from app.yudao_compat import YudaoResponse from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService from app.services.notification_service import get_notification_service from app.schemas import EdgeAlarmReport, EdgeAlarmResolve from app.utils.logger import logger router = APIRouter(prefix="/api/ai/alert", tags=["Edge-兼容路由"]) @router.post("/edge/report") async def edge_alarm_report( report: EdgeAlarmReport, service: AlarmEventService = Depends(get_alarm_event_service), ): """ Edge 告警上报(无认证) 与 /admin-api/aiot/alarm/edge/report 功能相同, 但不要求认证,供 Edge 设备直接调用。 """ alarm = service.create_from_edge_report(report.model_dump()) if alarm is None: return YudaoResponse.error(500, "告警创建失败") # WebSocket 通知 try: notification_svc = get_notification_service() notification_svc.notify_sync("new_alert", alarm.to_dict()) except Exception: pass return YudaoResponse.success({ "alarmId": alarm.alarm_id, "created": True, }) @router.post("/edge/resolve") async def edge_alarm_resolve( resolve: EdgeAlarmResolve, service: AlarmEventService = Depends(get_alarm_event_service), ): """ Edge 告警结束通知(无认证) 与 /admin-api/aiot/alarm/edge/resolve 功能相同, 但不要求认证,供 Edge 设备直接调用。 """ success = service.resolve_alarm( alarm_id=resolve.alarm_id, duration_ms=resolve.duration_ms, last_frame_time=resolve.last_frame_time, resolve_type=resolve.resolve_type, ) if not success: return YudaoResponse.error(404, "告警不存在") return YudaoResponse.success(True)