- requirements.txt 添加 pymysql、cos-python-sdk-v5 - 新增 /api/ai/alert/edge/report 和 /edge/resolve 路由(无认证) - 使 Edge 设备可直接上报到 FastAPI 服务 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
72 lines
2.1 KiB
Python
72 lines
2.1 KiB
Python
"""
|
|
Edge 设备兼容路由
|
|
|
|
Edge 设备使用 /api/ai/alert/edge/report 和 /api/ai/alert/edge/resolve 路径上报告警,
|
|
该路径与 WVP 端点一致。本模块提供相同路径的路由,无需认证,
|
|
使 Edge 设备可以直接上报到 FastAPI 服务。
|
|
"""
|
|
|
|
import asyncio
|
|
from datetime import datetime
|
|
from fastapi import APIRouter, Depends
|
|
from typing import Optional
|
|
|
|
from app.yudao_compat import YudaoResponse
|
|
from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService
|
|
from app.services.notification_service import get_notification_service
|
|
from app.schemas import EdgeAlarmReport, EdgeAlarmResolve
|
|
from app.utils.logger import logger
|
|
|
|
router = APIRouter(prefix="/api/ai/alert", tags=["Edge-兼容路由"])
|
|
|
|
|
|
@router.post("/edge/report")
|
|
async def edge_alarm_report(
|
|
report: EdgeAlarmReport,
|
|
service: AlarmEventService = Depends(get_alarm_event_service),
|
|
):
|
|
"""
|
|
Edge 告警上报(无认证)
|
|
|
|
与 /admin-api/aiot/alarm/edge/report 功能相同,
|
|
但不要求认证,供 Edge 设备直接调用。
|
|
"""
|
|
alarm = service.create_from_edge_report(report.model_dump())
|
|
|
|
if alarm is None:
|
|
return YudaoResponse.error(500, "告警创建失败")
|
|
|
|
# WebSocket 通知
|
|
try:
|
|
notification_svc = get_notification_service()
|
|
notification_svc.notify_sync("new_alert", alarm.to_dict())
|
|
except Exception:
|
|
pass
|
|
|
|
return YudaoResponse.success({
|
|
"alarmId": alarm.alarm_id,
|
|
"created": True,
|
|
})
|
|
|
|
|
|
@router.post("/edge/resolve")
|
|
async def edge_alarm_resolve(
|
|
resolve: EdgeAlarmResolve,
|
|
service: AlarmEventService = Depends(get_alarm_event_service),
|
|
):
|
|
"""
|
|
Edge 告警结束通知(无认证)
|
|
|
|
与 /admin-api/aiot/alarm/edge/resolve 功能相同,
|
|
但不要求认证,供 Edge 设备直接调用。
|
|
"""
|
|
success = service.resolve_alarm(
|
|
alarm_id=resolve.alarm_id,
|
|
duration_ms=resolve.duration_ms,
|
|
last_frame_time=resolve.last_frame_time,
|
|
resolve_type=resolve.resolve_type,
|
|
)
|
|
if not success:
|
|
return YudaoResponse.error(404, "告警不存在")
|
|
return YudaoResponse.success(True)
|