Files
16337 9cee0a2cac fix: 添加 pymysql 依赖 + Edge 兼容路由
- requirements.txt 添加 pymysql、cos-python-sdk-v5
- 新增 /api/ai/alert/edge/report 和 /edge/resolve 路由(无认证)
- 使 Edge 设备可直接上报到 FastAPI 服务

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 13:18:35 +08:00

72 lines
2.1 KiB
Python

"""
Edge 设备兼容路由
Edge 设备使用 /api/ai/alert/edge/report 和 /api/ai/alert/edge/resolve 路径上报告警,
该路径与 WVP 端点一致。本模块提供相同路径的路由,无需认证,
使 Edge 设备可以直接上报到 FastAPI 服务。
"""
import asyncio
from datetime import datetime
from fastapi import APIRouter, Depends
from typing import Optional
from app.yudao_compat import YudaoResponse
from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService
from app.services.notification_service import get_notification_service
from app.schemas import EdgeAlarmReport, EdgeAlarmResolve
from app.utils.logger import logger
router = APIRouter(prefix="/api/ai/alert", tags=["Edge-兼容路由"])
@router.post("/edge/report")
async def edge_alarm_report(
report: EdgeAlarmReport,
service: AlarmEventService = Depends(get_alarm_event_service),
):
"""
Edge 告警上报(无认证)
与 /admin-api/aiot/alarm/edge/report 功能相同,
但不要求认证,供 Edge 设备直接调用。
"""
alarm = service.create_from_edge_report(report.model_dump())
if alarm is None:
return YudaoResponse.error(500, "告警创建失败")
# WebSocket 通知
try:
notification_svc = get_notification_service()
notification_svc.notify_sync("new_alert", alarm.to_dict())
except Exception:
pass
return YudaoResponse.success({
"alarmId": alarm.alarm_id,
"created": True,
})
@router.post("/edge/resolve")
async def edge_alarm_resolve(
resolve: EdgeAlarmResolve,
service: AlarmEventService = Depends(get_alarm_event_service),
):
"""
Edge 告警结束通知(无认证)
与 /admin-api/aiot/alarm/edge/resolve 功能相同,
但不要求认证,供 Edge 设备直接调用。
"""
success = service.resolve_alarm(
alarm_id=resolve.alarm_id,
duration_ms=resolve.duration_ms,
last_frame_time=resolve.last_frame_time,
resolve_type=resolve.resolve_type,
)
if not success:
return YudaoResponse.error(404, "告警不存在")
return YudaoResponse.success(True)