1. notify_dispatch: 工单优先于卡片发送,创建成功则跳过直发卡片(等IoT回调send-card) 2. wechat_callback: IoT API 成功后直接返回,等 sync-status 回调更新告警+卡片 3. edge_compat: 启用工单自动结单,成功后等 sync-status 回调 4. yudao_aiot_alarm: 前端操作优先调 IoT 工单 API,降级直接更新卡片 5. wechat_notify_api: 修复 confirmed 的 card_action 为 None 导致卡片不更新的 bug 所有路径均保留降级逻辑:IoT 失败或工单未启用时直接处理告警+更新卡片
223 lines
8.1 KiB
Python
223 lines
8.1 KiB
Python
"""
|
||
Edge 设备兼容路由
|
||
|
||
Edge 设备使用 /api/ai/alert/edge/report 和 /api/ai/alert/edge/resolve 路径上报告警,
|
||
该路径与 WVP 端点一致。本模块提供相同路径的路由,无需认证,
|
||
使 Edge 设备可以直接上报到 FastAPI 服务。
|
||
"""
|
||
|
||
import asyncio
|
||
from datetime import datetime
|
||
from fastapi import APIRouter, Depends, Request, Header, HTTPException
|
||
from typing import Optional
|
||
|
||
from app.yudao_compat import YudaoResponse
|
||
from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService
|
||
from app.services.notification_service import get_notification_service
|
||
from app.schemas import EdgeAlarmReport, EdgeAlarmResolve
|
||
from app.models import EdgeDevice, DeviceStatus, get_session
|
||
from app.utils.logger import logger
|
||
from app.utils.timezone import beijing_now
|
||
from app.config import settings
|
||
|
||
router = APIRouter(prefix="/api/ai", tags=["Edge-兼容路由"])
|
||
|
||
|
||
def _verify_edge_token(authorization: Optional[str] = Header(None)):
|
||
"""验证 Edge 设备 Token(仅在 EDGE_AUTH_ENABLED=true 时启用)"""
|
||
cfg = settings.edge_auth
|
||
if not cfg.enabled or not cfg.token:
|
||
return # 未启用认证,放行
|
||
if not authorization:
|
||
raise HTTPException(status_code=401, detail="缺少 Authorization 头")
|
||
# 支持 "Bearer xxx" 格式
|
||
token = authorization.removeprefix("Bearer ").strip()
|
||
if token != cfg.token:
|
||
raise HTTPException(status_code=401, detail="Token 无效")
|
||
|
||
|
||
@router.post("/alert/edge/report")
|
||
async def edge_alarm_report(
|
||
report: EdgeAlarmReport,
|
||
service: AlarmEventService = Depends(get_alarm_event_service),
|
||
_auth=Depends(_verify_edge_token),
|
||
):
|
||
"""
|
||
Edge 告警上报(无认证)
|
||
|
||
与 /admin-api/aiot/alarm/edge/report 功能相同,
|
||
但不要求认证,供 Edge 设备直接调用。
|
||
"""
|
||
alarm = service.create_from_edge_report(report.model_dump())
|
||
|
||
if alarm is None:
|
||
return YudaoResponse.error(500, "告警创建失败")
|
||
|
||
# WebSocket 通知
|
||
try:
|
||
notification_svc = get_notification_service()
|
||
notification_svc.notify_sync("new_alert", alarm.to_dict())
|
||
except Exception:
|
||
pass
|
||
|
||
# 异步触发 VLM 复核 + 企微通知(不阻塞响应)
|
||
try:
|
||
from app.services.notify_dispatch import process_alarm_notification
|
||
ext_data = report.ext_data or {}
|
||
notify_data = {
|
||
"alarm_id": alarm.alarm_id,
|
||
"alarm_type": alarm.alarm_type,
|
||
"device_id": alarm.device_id,
|
||
"scene_id": alarm.scene_id,
|
||
"event_time": alarm.event_time,
|
||
"alarm_level": alarm.alarm_level,
|
||
"snapshot_url": alarm.snapshot_url,
|
||
"area_id": alarm.area_id,
|
||
"skip_vlm": ext_data.get("skip_vlm", False),
|
||
}
|
||
asyncio.create_task(process_alarm_notification(notify_data))
|
||
except Exception as e:
|
||
logger.error(f"触发告警通知失败: {e}", exc_info=True)
|
||
|
||
return YudaoResponse.success({
|
||
"alarmId": alarm.alarm_id,
|
||
"created": True,
|
||
})
|
||
|
||
|
||
@router.post("/alert/edge/resolve")
|
||
async def edge_alarm_resolve(
|
||
resolve: EdgeAlarmResolve,
|
||
service: AlarmEventService = Depends(get_alarm_event_service),
|
||
_auth=Depends(_verify_edge_token),
|
||
):
|
||
"""
|
||
Edge 告警结束通知(无认证)
|
||
|
||
与 /admin-api/aiot/alarm/edge/resolve 功能相同,
|
||
但不要求认证,供 Edge 设备直接调用。
|
||
支持先到先得:已被人工处理的告警不覆盖状态。
|
||
"""
|
||
# 先检查是否已到终态(先到先得)
|
||
was_terminal = service.is_alarm_terminal(resolve.alarm_id)
|
||
|
||
success = service.resolve_alarm(
|
||
alarm_id=resolve.alarm_id,
|
||
duration_ms=resolve.duration_ms,
|
||
last_frame_time=resolve.last_frame_time,
|
||
resolve_type=resolve.resolve_type,
|
||
)
|
||
if not success:
|
||
return YudaoResponse.error(404, "告警不存在")
|
||
|
||
# 如果之前不是终态(边缘端先到),触发卡片更新
|
||
if not was_terminal:
|
||
asyncio.create_task(_resolve_card_update(resolve.alarm_id, resolve.resolve_type))
|
||
|
||
return YudaoResponse.success(True)
|
||
|
||
|
||
@router.post("/device/heartbeat")
|
||
async def edge_device_heartbeat(request: Request, _auth=Depends(_verify_edge_token)):
|
||
"""
|
||
Edge 设备心跳上报(无认证)
|
||
|
||
每 30 秒由 Edge 推理服务调用,更新设备在线状态和运行指标。
|
||
"""
|
||
data = await request.json()
|
||
device_id = data.get("device_id", "")
|
||
status_data = data.get("status", {})
|
||
|
||
if not device_id:
|
||
return YudaoResponse.error(400, "缺少 device_id")
|
||
|
||
db = get_session()
|
||
try:
|
||
device = db.query(EdgeDevice).filter(EdgeDevice.device_id == device_id).first()
|
||
now = beijing_now()
|
||
|
||
if not device:
|
||
device = EdgeDevice(
|
||
device_id=device_id,
|
||
device_name=device_id,
|
||
status=DeviceStatus.ONLINE,
|
||
last_heartbeat=now,
|
||
created_at=now,
|
||
)
|
||
db.add(device)
|
||
|
||
device.status = DeviceStatus.ONLINE
|
||
device.last_heartbeat = now
|
||
device.uptime_seconds = status_data.get("uptime_seconds")
|
||
device.frames_processed = status_data.get("frames_processed")
|
||
device.alerts_generated = status_data.get("alerts_generated")
|
||
device.stream_count = status_data.get("stream_count")
|
||
device.config_version = str(status_data.get("config_version", ""))
|
||
device.extra_info = status_data.get("stream_stats")
|
||
device.updated_at = now
|
||
|
||
# 从请求头获取 IP
|
||
client_ip = request.headers.get("x-forwarded-for", "").split(",")[0].strip()
|
||
if not client_ip:
|
||
client_ip = request.client.host if request.client else None
|
||
if client_ip:
|
||
device.ip_address = client_ip
|
||
|
||
db.commit()
|
||
logger.debug(f"[心跳] 设备 {device_id} 心跳更新成功")
|
||
|
||
return YudaoResponse.success(True)
|
||
except Exception as e:
|
||
db.rollback()
|
||
logger.error(f"[心跳] 处理失败: {e}")
|
||
return YudaoResponse.error(500, f"心跳处理失败: {e}")
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
async def _resolve_card_update(alarm_id: str, resolve_type: str):
|
||
"""边缘端 resolve 后异步处理:工单自动结单 + 更新卡片"""
|
||
try:
|
||
from app.services.work_order_client import get_work_order_client
|
||
from app.services.wechat_service import get_wechat_service
|
||
from app.models import get_session, AlarmEventExt
|
||
|
||
# 工单自动结单
|
||
wo_client = get_work_order_client()
|
||
if wo_client.enabled:
|
||
db = get_session()
|
||
try:
|
||
ext = db.query(AlarmEventExt).filter(
|
||
AlarmEventExt.alarm_id == alarm_id,
|
||
AlarmEventExt.ext_type == "WORK_ORDER",
|
||
).first()
|
||
order_id = ext.ext_data.get("order_id", "") if ext and ext.ext_data else ""
|
||
finally:
|
||
db.close()
|
||
if order_id:
|
||
remark_map = {
|
||
"person_returned": "人员回岗自动关闭",
|
||
"non_work_time": "非工作时间自动关闭",
|
||
"intrusion_cleared": "入侵消失自动关闭",
|
||
}
|
||
remark = remark_map.get(resolve_type, f"边缘端自动结单: {resolve_type}")
|
||
success = await wo_client.auto_complete_order(order_id, remark)
|
||
if success:
|
||
logger.info(f"IoT工单已自动结单,等待sync-status回调: alarm={alarm_id}, order={order_id}")
|
||
return # IoT 回调 sync-status 会更新告警+卡片
|
||
|
||
# 降级:直接更新企微卡片到终态
|
||
wechat = get_wechat_service()
|
||
if wechat.enabled:
|
||
response_code = wechat.get_response_code(alarm_id)
|
||
if response_code:
|
||
await wechat.update_alarm_card_terminal(
|
||
response_code=response_code,
|
||
user_ids=[],
|
||
alarm_id=alarm_id,
|
||
action="auto_resolve",
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"边缘端resolve后处理失败: alarm={alarm_id}, error={e}", exc_info=True)
|