""" Edge 设备兼容路由 Edge 设备使用 /api/ai/alert/edge/report 和 /api/ai/alert/edge/resolve 路径上报告警, 该路径与 WVP 端点一致。本模块提供相同路径的路由,无需认证, 使 Edge 设备可以直接上报到 FastAPI 服务。 """ import asyncio from datetime import datetime from fastapi import APIRouter, Depends from typing import Optional from app.yudao_compat import YudaoResponse from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService from app.services.notification_service import get_notification_service from app.schemas import EdgeAlarmReport, EdgeAlarmResolve from app.utils.logger import logger router = APIRouter(prefix="/api/ai/alert", tags=["Edge-兼容路由"]) @router.post("/edge/report") async def edge_alarm_report( report: EdgeAlarmReport, service: AlarmEventService = Depends(get_alarm_event_service), ): """ Edge 告警上报(无认证) 与 /admin-api/aiot/alarm/edge/report 功能相同, 但不要求认证,供 Edge 设备直接调用。 """ alarm = service.create_from_edge_report(report.model_dump()) if alarm is None: return YudaoResponse.error(500, "告警创建失败") # WebSocket 通知 try: notification_svc = get_notification_service() notification_svc.notify_sync("new_alert", alarm.to_dict()) except Exception: pass # 异步触发 VLM 复核 + 企微通知(不阻塞响应) try: from app.services.notify_dispatch import process_alarm_notification notify_data = { "alarm_id": alarm.alarm_id, "alarm_type": alarm.alarm_type, "device_id": alarm.device_id, "scene_id": alarm.scene_id, "event_time": alarm.event_time, "alarm_level": alarm.alarm_level, "snapshot_url": alarm.snapshot_url, } asyncio.create_task(process_alarm_notification(notify_data)) except Exception as e: logger.error(f"触发告警通知失败: {e}", exc_info=True) return YudaoResponse.success({ "alarmId": alarm.alarm_id, "created": True, }) @router.post("/edge/resolve") async def edge_alarm_resolve( resolve: EdgeAlarmResolve, service: AlarmEventService = Depends(get_alarm_event_service), ): """ Edge 告警结束通知(无认证) 与 /admin-api/aiot/alarm/edge/resolve 功能相同, 但不要求认证,供 Edge 设备直接调用。 支持先到先得:已被人工处理的告警不覆盖状态。 """ # 先检查是否已到终态(先到先得) was_terminal = service.is_alarm_terminal(resolve.alarm_id) success = service.resolve_alarm( alarm_id=resolve.alarm_id, duration_ms=resolve.duration_ms, last_frame_time=resolve.last_frame_time, resolve_type=resolve.resolve_type, ) if not success: return YudaoResponse.error(404, "告警不存在") # 如果之前不是终态(边缘端先到),触发自动结单 + 卡片更新 if not was_terminal: asyncio.create_task(_resolve_work_order_and_card(resolve.alarm_id, resolve.resolve_type)) return YudaoResponse.success(True) async def _resolve_work_order_and_card(alarm_id: str, resolve_type: str): """边缘端 resolve 后异步处理:更新卡片 + 自动结单""" try: from app.services.work_order_client import get_work_order_client from app.services.wechat_service import get_wechat_service from app.models import get_session, AlarmEventExt # 1. 自动结单 wo_client = get_work_order_client() if wo_client.enabled: db = get_session() try: ext = db.query(AlarmEventExt).filter( AlarmEventExt.alarm_id == alarm_id, AlarmEventExt.ext_type == "WORK_ORDER", ).first() order_id = ext.ext_data.get("order_id", "") if ext and ext.ext_data else "" finally: db.close() if order_id: remark_map = { "person_returned": "人员回岗自动关闭", "non_work_time": "非工作时间自动关闭", "intrusion_cleared": "入侵消失自动关闭", } remark = remark_map.get(resolve_type, f"边缘端自动结单: {resolve_type}") await wo_client.auto_complete_order(order_id, remark) # 2. 更新企微卡片到终态(如果有 response_code) wechat = get_wechat_service() if wechat.enabled: response_code = wechat.get_response_code(alarm_id) if response_code: await wechat.update_alarm_card_terminal( response_code=response_code, user_ids=[], # 空列表时企微更新所有已收到卡片的用户 alarm_id=alarm_id, action="auto_resolve", ) except Exception as e: logger.error(f"边缘端resolve后处理失败: alarm={alarm_id}, error={e}", exc_info=True)