Files
iot-device-management-service/app/routers/edge_compat.py
16337 aaf175b129 修复:告警通知显示真实区域名称,从 IoT 平台查询 area_id 对应的 areaName
- edge_compat 上报时传入 area_id 到通知流水线
- notify_dispatch 优先从 IoT 平台 API 查区域名(带缓存+token缓存)
- 演示模式下如果已有真实区域名则不覆盖为"演示区域"
2026-03-19 11:08:33 +08:00

202 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Edge 设备兼容路由
Edge 设备使用 /api/ai/alert/edge/report 和 /api/ai/alert/edge/resolve 路径上报告警,
该路径与 WVP 端点一致。本模块提供相同路径的路由,无需认证,
使 Edge 设备可以直接上报到 FastAPI 服务。
"""
import asyncio
from datetime import datetime
from fastapi import APIRouter, Depends, Request
from typing import Optional
from app.yudao_compat import YudaoResponse
from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService
from app.services.notification_service import get_notification_service
from app.schemas import EdgeAlarmReport, EdgeAlarmResolve
from app.models import EdgeDevice, DeviceStatus, get_session
from app.utils.logger import logger
from app.utils.timezone import beijing_now
router = APIRouter(prefix="/api/ai", tags=["Edge-兼容路由"])
@router.post("/alert/edge/report")
async def edge_alarm_report(
report: EdgeAlarmReport,
service: AlarmEventService = Depends(get_alarm_event_service),
):
"""
Edge 告警上报(无认证)
与 /admin-api/aiot/alarm/edge/report 功能相同,
但不要求认证,供 Edge 设备直接调用。
"""
alarm = service.create_from_edge_report(report.model_dump())
if alarm is None:
return YudaoResponse.error(500, "告警创建失败")
# WebSocket 通知
try:
notification_svc = get_notification_service()
notification_svc.notify_sync("new_alert", alarm.to_dict())
except Exception:
pass
# 异步触发 VLM 复核 + 企微通知(不阻塞响应)
try:
from app.services.notify_dispatch import process_alarm_notification
notify_data = {
"alarm_id": alarm.alarm_id,
"alarm_type": alarm.alarm_type,
"device_id": alarm.device_id,
"scene_id": alarm.scene_id,
"event_time": alarm.event_time,
"alarm_level": alarm.alarm_level,
"snapshot_url": alarm.snapshot_url,
"area_id": alarm.area_id,
}
asyncio.create_task(process_alarm_notification(notify_data))
except Exception as e:
logger.error(f"触发告警通知失败: {e}", exc_info=True)
return YudaoResponse.success({
"alarmId": alarm.alarm_id,
"created": True,
})
@router.post("/alert/edge/resolve")
async def edge_alarm_resolve(
resolve: EdgeAlarmResolve,
service: AlarmEventService = Depends(get_alarm_event_service),
):
"""
Edge 告警结束通知(无认证)
与 /admin-api/aiot/alarm/edge/resolve 功能相同,
但不要求认证,供 Edge 设备直接调用。
支持先到先得:已被人工处理的告警不覆盖状态。
"""
# 先检查是否已到终态(先到先得)
was_terminal = service.is_alarm_terminal(resolve.alarm_id)
success = service.resolve_alarm(
alarm_id=resolve.alarm_id,
duration_ms=resolve.duration_ms,
last_frame_time=resolve.last_frame_time,
resolve_type=resolve.resolve_type,
)
if not success:
return YudaoResponse.error(404, "告警不存在")
# 如果之前不是终态(边缘端先到),触发卡片更新
if not was_terminal:
asyncio.create_task(_resolve_card_update(resolve.alarm_id, resolve.resolve_type))
return YudaoResponse.success(True)
@router.post("/device/heartbeat")
async def edge_device_heartbeat(request: Request):
"""
Edge 设备心跳上报(无认证)
每 30 秒由 Edge 推理服务调用,更新设备在线状态和运行指标。
"""
data = await request.json()
device_id = data.get("device_id", "")
status_data = data.get("status", {})
if not device_id:
return YudaoResponse.error(400, "缺少 device_id")
db = get_session()
try:
device = db.query(EdgeDevice).filter(EdgeDevice.device_id == device_id).first()
now = beijing_now()
if not device:
device = EdgeDevice(
device_id=device_id,
device_name=device_id,
status=DeviceStatus.ONLINE,
last_heartbeat=now,
created_at=now,
)
db.add(device)
device.status = DeviceStatus.ONLINE
device.last_heartbeat = now
device.uptime_seconds = status_data.get("uptime_seconds")
device.frames_processed = status_data.get("frames_processed")
device.alerts_generated = status_data.get("alerts_generated")
device.stream_count = status_data.get("stream_count")
device.config_version = str(status_data.get("config_version", ""))
device.extra_info = status_data.get("stream_stats")
device.updated_at = now
# 从请求头获取 IP
client_ip = request.headers.get("x-forwarded-for", "").split(",")[0].strip()
if not client_ip:
client_ip = request.client.host if request.client else None
if client_ip:
device.ip_address = client_ip
db.commit()
logger.debug(f"[心跳] 设备 {device_id} 心跳更新成功")
return YudaoResponse.success(True)
except Exception as e:
db.rollback()
logger.error(f"[心跳] 处理失败: {e}")
return YudaoResponse.error(500, f"心跳处理失败: {e}")
finally:
db.close()
async def _resolve_card_update(alarm_id: str, resolve_type: str):
"""边缘端 resolve 后异步处理:更新卡片(工单暂未上线)"""
try:
from app.services.wechat_service import get_wechat_service
# 工单自动结单(暂未上线)
# from app.services.work_order_client import get_work_order_client
# from app.models import get_session, AlarmEventExt
# wo_client = get_work_order_client()
# if wo_client.enabled:
# db = get_session()
# try:
# ext = db.query(AlarmEventExt).filter(
# AlarmEventExt.alarm_id == alarm_id,
# AlarmEventExt.ext_type == "WORK_ORDER",
# ).first()
# order_id = ext.ext_data.get("order_id", "") if ext and ext.ext_data else ""
# finally:
# db.close()
# if order_id:
# remark_map = {
# "person_returned": "人员回岗自动关闭",
# "non_work_time": "非工作时间自动关闭",
# "intrusion_cleared": "入侵消失自动关闭",
# }
# remark = remark_map.get(resolve_type, f"边缘端自动结单: {resolve_type}")
# await wo_client.auto_complete_order(order_id, remark)
# 更新企微卡片到终态(如果有 response_code
wechat = get_wechat_service()
if wechat.enabled:
response_code = wechat.get_response_code(alarm_id)
if response_code:
await wechat.update_alarm_card_terminal(
response_code=response_code,
user_ids=[],
alarm_id=alarm_id,
action="auto_resolve",
)
except Exception as e:
logger.error(f"边缘端resolve后处理失败: alarm={alarm_id}, error={e}", exc_info=True)