Files
iot-device-management-service/app/routers/edge_compat.py
16337 35386b8e6e feat: V1 VLM复核 + 企微通知 + 手动结单
- 新增3张通知路由表模型(notify_area, camera_area_binding, area_person_binding)
- 新增VLM复核服务,通过qwen3-vl-flash对告警截图二次确认
- 新增企微通知服务,告警确认后推送文本卡片给责任人
- 新增通知调度服务,编排VLM复核→查表路由→企微推送流水线
- 新增企微回调接口,支持手动结单/确认处理/标记误报
- 新增通知管理API,区域/摄像头绑定/人员绑定CRUD
- 告警上报主流程(edge_compat + yudao_aiot_alarm)接入异步通知
- 扩展配置项支持VLM和企微环境变量
- 添加openai==1.68.0依赖(通过DashScope兼容端点调用)
2026-03-06 13:35:40 +08:00

88 lines
2.8 KiB
Python

"""
Edge 设备兼容路由
Edge 设备使用 /api/ai/alert/edge/report 和 /api/ai/alert/edge/resolve 路径上报告警,
该路径与 WVP 端点一致。本模块提供相同路径的路由,无需认证,
使 Edge 设备可以直接上报到 FastAPI 服务。
"""
import asyncio
from datetime import datetime
from fastapi import APIRouter, Depends
from typing import Optional
from app.yudao_compat import YudaoResponse
from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService
from app.services.notification_service import get_notification_service
from app.schemas import EdgeAlarmReport, EdgeAlarmResolve
from app.utils.logger import logger
router = APIRouter(prefix="/api/ai/alert", tags=["Edge-兼容路由"])
@router.post("/edge/report")
async def edge_alarm_report(
report: EdgeAlarmReport,
service: AlarmEventService = Depends(get_alarm_event_service),
):
"""
Edge 告警上报(无认证)
与 /admin-api/aiot/alarm/edge/report 功能相同,
但不要求认证,供 Edge 设备直接调用。
"""
alarm = service.create_from_edge_report(report.model_dump())
if alarm is None:
return YudaoResponse.error(500, "告警创建失败")
# WebSocket 通知
try:
notification_svc = get_notification_service()
notification_svc.notify_sync("new_alert", alarm.to_dict())
except Exception:
pass
# 异步触发 VLM 复核 + 企微通知(不阻塞响应)
try:
from app.services.notify_dispatch import process_alarm_notification
notify_data = {
"alarm_id": alarm.alarm_id,
"alarm_type": alarm.alarm_type,
"device_id": alarm.device_id,
"scene_id": alarm.scene_id,
"event_time": alarm.event_time,
"alarm_level": alarm.alarm_level,
"snapshot_url": alarm.snapshot_url,
}
asyncio.create_task(process_alarm_notification(notify_data))
except Exception:
pass # 通知失败不影响主流程
return YudaoResponse.success({
"alarmId": alarm.alarm_id,
"created": True,
})
@router.post("/edge/resolve")
async def edge_alarm_resolve(
resolve: EdgeAlarmResolve,
service: AlarmEventService = Depends(get_alarm_event_service),
):
"""
Edge 告警结束通知(无认证)
与 /admin-api/aiot/alarm/edge/resolve 功能相同,
但不要求认证,供 Edge 设备直接调用。
"""
success = service.resolve_alarm(
alarm_id=resolve.alarm_id,
duration_ms=resolve.duration_ms,
last_frame_time=resolve.last_frame_time,
resolve_type=resolve.resolve_type,
)
if not success:
return YudaoResponse.error(404, "告警不存在")
return YudaoResponse.success(True)