Files
iot-device-management-service/app/routers/edge_compat.py
16337 f7a69892f6 fix: 注释工单对接代码,修复API响应解析和类型问题
- 工单创建/自动结单代码全部注释,待本地测试后启用
- 修复create_order响应解析:data直接是orderId,非嵌套对象
- 修复areaId类型:int(文档要求),非str
- 修复auto-complete orderId类型:int
- 两步卡片状态机和先到先得逻辑保留生效

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-12 18:20:30 +08:00

141 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Edge 设备兼容路由
Edge 设备使用 /api/ai/alert/edge/report 和 /api/ai/alert/edge/resolve 路径上报告警,
该路径与 WVP 端点一致。本模块提供相同路径的路由,无需认证,
使 Edge 设备可以直接上报到 FastAPI 服务。
"""
import asyncio
from datetime import datetime
from fastapi import APIRouter, Depends
from typing import Optional
from app.yudao_compat import YudaoResponse
from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService
from app.services.notification_service import get_notification_service
from app.schemas import EdgeAlarmReport, EdgeAlarmResolve
from app.utils.logger import logger
router = APIRouter(prefix="/api/ai/alert", tags=["Edge-兼容路由"])
@router.post("/edge/report")
async def edge_alarm_report(
report: EdgeAlarmReport,
service: AlarmEventService = Depends(get_alarm_event_service),
):
"""
Edge 告警上报(无认证)
与 /admin-api/aiot/alarm/edge/report 功能相同,
但不要求认证,供 Edge 设备直接调用。
"""
alarm = service.create_from_edge_report(report.model_dump())
if alarm is None:
return YudaoResponse.error(500, "告警创建失败")
# WebSocket 通知
try:
notification_svc = get_notification_service()
notification_svc.notify_sync("new_alert", alarm.to_dict())
except Exception:
pass
# 异步触发 VLM 复核 + 企微通知(不阻塞响应)
try:
from app.services.notify_dispatch import process_alarm_notification
notify_data = {
"alarm_id": alarm.alarm_id,
"alarm_type": alarm.alarm_type,
"device_id": alarm.device_id,
"scene_id": alarm.scene_id,
"event_time": alarm.event_time,
"alarm_level": alarm.alarm_level,
"snapshot_url": alarm.snapshot_url,
}
asyncio.create_task(process_alarm_notification(notify_data))
except Exception as e:
logger.error(f"触发告警通知失败: {e}", exc_info=True)
return YudaoResponse.success({
"alarmId": alarm.alarm_id,
"created": True,
})
@router.post("/edge/resolve")
async def edge_alarm_resolve(
resolve: EdgeAlarmResolve,
service: AlarmEventService = Depends(get_alarm_event_service),
):
"""
Edge 告警结束通知(无认证)
与 /admin-api/aiot/alarm/edge/resolve 功能相同,
但不要求认证,供 Edge 设备直接调用。
支持先到先得:已被人工处理的告警不覆盖状态。
"""
# 先检查是否已到终态(先到先得)
was_terminal = service.is_alarm_terminal(resolve.alarm_id)
success = service.resolve_alarm(
alarm_id=resolve.alarm_id,
duration_ms=resolve.duration_ms,
last_frame_time=resolve.last_frame_time,
resolve_type=resolve.resolve_type,
)
if not success:
return YudaoResponse.error(404, "告警不存在")
# 如果之前不是终态(边缘端先到),触发卡片更新
if not was_terminal:
asyncio.create_task(_resolve_card_update(resolve.alarm_id, resolve.resolve_type))
return YudaoResponse.success(True)
async def _resolve_card_update(alarm_id: str, resolve_type: str):
"""边缘端 resolve 后异步处理:更新卡片(工单暂未上线)"""
try:
from app.services.wechat_service import get_wechat_service
# 工单自动结单(暂未上线)
# from app.services.work_order_client import get_work_order_client
# from app.models import get_session, AlarmEventExt
# wo_client = get_work_order_client()
# if wo_client.enabled:
# db = get_session()
# try:
# ext = db.query(AlarmEventExt).filter(
# AlarmEventExt.alarm_id == alarm_id,
# AlarmEventExt.ext_type == "WORK_ORDER",
# ).first()
# order_id = ext.ext_data.get("order_id", "") if ext and ext.ext_data else ""
# finally:
# db.close()
# if order_id:
# remark_map = {
# "person_returned": "人员回岗自动关闭",
# "non_work_time": "非工作时间自动关闭",
# "intrusion_cleared": "入侵消失自动关闭",
# }
# remark = remark_map.get(resolve_type, f"边缘端自动结单: {resolve_type}")
# await wo_client.auto_complete_order(order_id, remark)
# 更新企微卡片到终态(如果有 response_code
wechat = get_wechat_service()
if wechat.enabled:
response_code = wechat.get_response_code(alarm_id)
if response_code:
await wechat.update_alarm_card_terminal(
response_code=response_code,
user_ids=[],
alarm_id=alarm_id,
action="auto_resolve",
)
except Exception as e:
logger.error(f"边缘端resolve后处理失败: alarm={alarm_id}, error={e}", exc_info=True)