Files
iot-device-management-service/app/routers/edge_compat.py

72 lines
2.1 KiB
Python
Raw Normal View History

"""
Edge 设备兼容路由
Edge 设备使用 /api/ai/alert/edge/report /api/ai/alert/edge/resolve 路径上报告警
该路径与 WVP 端点一致本模块提供相同路径的路由无需认证
使 Edge 设备可以直接上报到 FastAPI 服务
"""
import asyncio
from datetime import datetime
from fastapi import APIRouter, Depends
from typing import Optional
from app.yudao_compat import YudaoResponse
from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService
from app.services.notification_service import get_notification_service
from app.schemas import EdgeAlarmReport, EdgeAlarmResolve
from app.utils.logger import logger
router = APIRouter(prefix="/api/ai/alert", tags=["Edge-兼容路由"])
@router.post("/edge/report")
async def edge_alarm_report(
report: EdgeAlarmReport,
service: AlarmEventService = Depends(get_alarm_event_service),
):
"""
Edge 告警上报无认证
/admin-api/aiot/alarm/edge/report 功能相同
但不要求认证 Edge 设备直接调用
"""
alarm = service.create_from_edge_report(report.model_dump())
if alarm is None:
return YudaoResponse.error(500, "告警创建失败")
# WebSocket 通知
try:
notification_svc = get_notification_service()
notification_svc.notify_sync("new_alert", alarm.to_dict())
except Exception:
pass
return YudaoResponse.success({
"alarmId": alarm.alarm_id,
"created": True,
})
@router.post("/edge/resolve")
async def edge_alarm_resolve(
resolve: EdgeAlarmResolve,
service: AlarmEventService = Depends(get_alarm_event_service),
):
"""
Edge 告警结束通知无认证
/admin-api/aiot/alarm/edge/resolve 功能相同
但不要求认证 Edge 设备直接调用
"""
success = service.resolve_alarm(
alarm_id=resolve.alarm_id,
duration_ms=resolve.duration_ms,
last_frame_time=resolve.last_frame_time,
resolve_type=resolve.resolve_type,
)
if not success:
return YudaoResponse.error(404, "告警不存在")
return YudaoResponse.success(True)