Files
iot-device-management-service/app/routers/yudao_aiot_alarm.py
16337 67bd8881fa 功能:新增看板统计接口(趋势、设备Top、时段分布)
- GET /alert/trend?days=7 — 按天+按类型的告警趋势
- GET /alert/device-top?limit=10&days=7 — 告警最多设备排行
- GET /alert/hour-distribution?days=7 — 24小时告警分布
- 扩展 statistics 接口:增加 todayCount/yesterdayCount/pendingCount/
  handledCount/avgResponseMinutes 字段

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 17:06:08 +08:00

520 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
AIoT 告警路由 - 芋道规范(新三表结构)
统一到 /admin-api/aiot/alarm 命名空间,与 aiot 平台架构对齐。
API 路径规范:
- /admin-api/aiot/alarm/alert/page - 分页查询
- /admin-api/aiot/alarm/alert/get - 获取详情
- /admin-api/aiot/alarm/alert/handle - 处理告警
- /admin-api/aiot/alarm/alert/delete - 删除告警
- /admin-api/aiot/alarm/alert/statistics - 获取统计
- /admin-api/aiot/alarm/device-summary/page - 设备告警汇总
"""
from fastapi import APIRouter, Query, Depends, HTTPException
from typing import Optional
from datetime import datetime
import asyncio
import os
import httpx
from app.yudao_compat import YudaoResponse, get_current_user
from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService
from app.services.notification_service import get_notification_service
from app.services.oss_storage import get_oss_storage
from app.services.camera_name_service import get_camera_name_service
from app.schemas import EdgeAlarmReport, EdgeAlarmResolve
from app.utils.logger import logger
router = APIRouter(prefix="/admin-api/aiot/alarm", tags=["AIoT-告警"])
async def _alarm_to_camel(alarm_dict: dict, camera_info_map: dict = None, camera_service=None) -> dict:
"""将 alarm_event 字典转换为前端 camelCase 格式(兼容前端旧字段名)
Args:
alarm_dict: 告警字典
camera_info_map: 摄像头信息映射 {device_id: camera_info},可选
camera_service: 摄像头服务实例,可选
"""
# snapshot_url: 根据存储方式转为可访问 URL
storage = get_oss_storage()
snapshot_url = alarm_dict.get("snapshot_url")
if snapshot_url:
if snapshot_url.startswith("local:"):
# 边缘端 COS 未配置时的本地路径标记,云端无法访问
snapshot_url = ""
else:
snapshot_url = storage.get_url(snapshot_url)
# alarm_level int → 文本映射
alarm_level = alarm_dict.get("alarm_level")
level_map = {0: "critical", 1: "high", 2: "medium", 3: "low"}
level_str = level_map.get(alarm_level, "medium") if alarm_level else "medium"
# alarm_status → 前端 status 映射
alarm_status = alarm_dict.get("alarm_status", "NEW")
status_map = {"NEW": "pending", "CONFIRMED": "handled", "FALSE": "ignored", "CLOSED": "handled"}
status_str = status_map.get(alarm_status, "pending")
# confidence_score 0-1 → 百分比
confidence_score = alarm_dict.get("confidence_score")
confidence_pct = round(confidence_score * 100) if confidence_score is not None else None
# duration_ms → 分钟
duration_ms = alarm_dict.get("duration_ms")
duration_minutes = round(duration_ms / 60000, 1) if duration_ms else None
alarm_id = alarm_dict.get("alarm_id")
device_id = alarm_dict.get("device_id")
# 获取摄像头信息
camera_info = None
if camera_info_map and device_id in camera_info_map:
camera_info = camera_info_map[device_id]
# 获取摄像头名称
device_name = device_id # 默认值
if camera_service and camera_info:
device_name = camera_service.format_display_name(device_id, camera_info)
# 提取摄像头ID统一使用stream作为编号
camera_id = device_id # 默认值
if camera_info:
stream = camera_info.get("stream")
if stream:
camera_id = stream
elif device_id and "/" in device_id:
# app/stream 格式,直接解析
parts = device_id.split("/", 1)
if len(parts) == 2:
camera_id = parts[1]
return {
# 新字段(三表结构)
"alarmId": alarm_id,
"alarmType": alarm_dict.get("alarm_type"),
"alarmTypeName": _get_alarm_type_name(alarm_dict.get("alarm_type")),
"algorithmCode": alarm_dict.get("algorithm_code"),
"deviceId": alarm_dict.get("device_id"),
"deviceName": device_name,
"sceneId": alarm_dict.get("scene_id"),
"eventTime": alarm_dict.get("event_time"),
"firstFrameTime": alarm_dict.get("first_frame_time"),
"lastFrameTime": alarm_dict.get("last_frame_time"),
"durationMs": duration_ms,
"alarmLevel": alarm_level,
"confidenceScore": confidence_score,
"alarmStatus": alarm_status,
"handleStatus": alarm_dict.get("handle_status"),
"snapshotUrl": snapshot_url,
"videoUrl": alarm_dict.get("video_url"),
"edgeNodeId": alarm_dict.get("edge_node_id"),
"handler": alarm_dict.get("handler"),
"handleRemark": alarm_dict.get("handle_remark"),
"handledAt": alarm_dict.get("handled_at"),
"createdAt": alarm_dict.get("created_at"),
"updatedAt": alarm_dict.get("updated_at"),
"ext": alarm_dict.get("ext"),
"llmAnalyses": alarm_dict.get("llm_analyses"),
# 兼容前端旧字段名
"id": alarm_id,
"alertNo": alarm_id,
"deviceId": device_id, # 原始ID用于查询
"cameraId": camera_id, # 摄像头IDstream编号
"cameraName": device_name, # 摄像头名称
"alertType": alarm_dict.get("alarm_type"),
"alertTypeName": _get_alarm_type_name(alarm_dict.get("alarm_type")),
"confidence": confidence_pct,
"durationMinutes": duration_minutes,
"status": status_str,
"level": level_str,
"triggerTime": alarm_dict.get("event_time"),
"ossUrl": snapshot_url,
"message": None,
"bbox": (alarm_dict.get("ext") or {}).get("ext_data", {}).get("bbox") if isinstance(alarm_dict.get("ext"), dict) else None,
}
# ==================== 告警管理 ====================
@router.get("/alert/page")
async def get_alert_page(
pageNo: int = Query(1, ge=1, description="页码"),
pageSize: int = Query(20, ge=1, le=100, description="每页大小"),
deviceId: Optional[str] = Query(None, description="摄像头/设备ID"),
cameraId: Optional[str] = Query(None, description="摄像头ID兼容旧接口"),
edgeNodeId: Optional[str] = Query(None, description="边缘节点ID"),
alarmType: Optional[str] = Query(None, alias="alertType", description="告警类型"),
alarmStatus: Optional[str] = Query(None, alias="status", description="告警状态"),
alarmLevel: Optional[int] = Query(None, description="告警级别: 0紧急/1重要/2普通/3轻微"),
startTime: Optional[datetime] = Query(None, description="开始时间"),
endTime: Optional[datetime] = Query(None, description="结束时间"),
service: AlarmEventService = Depends(get_alarm_event_service),
current_user: dict = Depends(get_current_user)
):
"""分页查询告警列表"""
# 兼容旧接口的 status 文本转换
if alarmStatus and alarmStatus in ("pending", "handled", "ignored"):
status_convert = {"pending": "NEW", "handled": "CONFIRMED", "ignored": "FALSE"}
alarmStatus = status_convert.get(alarmStatus, alarmStatus)
alarms, total = service.get_alarms(
device_id=deviceId or cameraId,
alarm_type=alarmType,
alarm_status=alarmStatus,
alarm_level=alarmLevel,
edge_node_id=edgeNodeId,
start_time=startTime,
end_time=endTime,
page=pageNo,
page_size=pageSize,
)
# 提取所有唯一的 device_id
device_ids = list(set(a.device_id for a in alarms if a.device_id))
# 批量查询摄像头信息(去重+并发优化),失败不影响列表返回
camera_service = get_camera_name_service()
camera_info_map = {}
try:
camera_info_map = await camera_service.get_camera_infos_batch(device_ids)
except Exception as e:
logger.warning(f"批量查询摄像头信息失败,将使用 device_id 作为名称: {e}")
# 转换为 camelCase 格式(使用摄像头信息映射)
alarm_list = [await _alarm_to_camel(a.to_dict(), camera_info_map, camera_service) for a in alarms]
return YudaoResponse.page(
list_data=alarm_list,
total=total,
page_no=pageNo,
page_size=pageSize
)
@router.get("/alert/get")
async def get_alert(
alarmId: Optional[str] = Query(None, description="告警ID"),
id: Optional[str] = Query(None, description="告警ID兼容旧接口"),
service: AlarmEventService = Depends(get_alarm_event_service),
current_user: dict = Depends(get_current_user)
):
"""获取告警详情"""
alarm_id = alarmId or id
if not alarm_id:
raise HTTPException(status_code=400, detail="缺少 alarmId 或 id 参数")
alarm_dict = service.get_alarm(alarm_id)
if not alarm_dict:
raise HTTPException(status_code=404, detail="告警不存在")
# 查询单个摄像头信息(失败不影响详情返回)
device_id = alarm_dict.get("device_id")
camera_service = get_camera_name_service()
camera_info_map = {}
if device_id:
try:
camera_info = await camera_service.get_camera_info(device_id)
camera_info_map[device_id] = camera_info
except Exception as e:
logger.warning(f"查询摄像头信息失败: device_id={device_id}, error={e}")
return YudaoResponse.success(await _alarm_to_camel(alarm_dict, camera_info_map, camera_service))
@router.put("/alert/handle")
async def handle_alert(
alarmId: Optional[str] = Query(None, description="告警ID"),
id: Optional[str] = Query(None, description="告警ID兼容"),
alarmStatus: Optional[str] = Query(None, description="告警状态: CONFIRMED/FALSE/CLOSED"),
handleStatus: Optional[str] = Query(None, description="处理状态: HANDLING/DONE/IGNORED"),
status: Optional[str] = Query(None, description="处理状态(兼容旧接口)"),
remark: Optional[str] = Query(None, description="处理备注"),
service: AlarmEventService = Depends(get_alarm_event_service),
current_user: dict = Depends(get_current_user)
):
"""处理告警"""
alarm_id = alarmId or id
if not alarm_id:
raise HTTPException(status_code=400, detail="缺少 alarmId 或 id 参数")
handler = current_user.get("username", "admin")
# 兼容旧接口: status=handled → alarmStatus=CONFIRMED, status=ignored → alarmStatus=FALSE
if not alarmStatus and status:
status_convert = {"handled": "CONFIRMED", "ignored": "FALSE", "resolved": "CLOSED"}
alarmStatus = status_convert.get(status, status.upper())
# 忽略操作设置 handle_status=IGNORED区别于自动结单的 DONE
if status == "ignored" and not handleStatus:
handleStatus = "IGNORED"
alarm = service.handle_alarm(
alarm_id=alarm_id,
alarm_status=alarmStatus,
handle_status=handleStatus,
remark=remark,
handler=handler,
)
if not alarm:
raise HTTPException(status_code=404, detail="告警不存在")
return YudaoResponse.success(True)
@router.delete("/alert/delete")
async def delete_alert(
alarmId: Optional[str] = Query(None, description="告警ID"),
id: Optional[str] = Query(None, description="告警ID兼容"),
service: AlarmEventService = Depends(get_alarm_event_service),
current_user: dict = Depends(get_current_user)
):
"""删除告警"""
alarm_id = alarmId or id
if not alarm_id:
raise HTTPException(status_code=400, detail="缺少 alarmId 或 id 参数")
success = service.delete_alarm(alarm_id)
if not success:
raise HTTPException(status_code=404, detail="告警不存在")
return YudaoResponse.success(True)
@router.get("/alert/statistics")
async def get_statistics(
service: AlarmEventService = Depends(get_alarm_event_service),
current_user: dict = Depends(get_current_user)
):
"""获取告警统计"""
stats = service.get_statistics()
return YudaoResponse.success(stats)
@router.get("/alert/trend")
async def get_alert_trend(
days: int = Query(7, ge=1, le=90, description="统计天数"),
service: AlarmEventService = Depends(get_alarm_event_service),
current_user: dict = Depends(get_current_user)
):
"""获取告警趋势(按天+按类型分组)"""
data = service.get_trend(days=days)
return YudaoResponse.success(data)
@router.get("/alert/device-top")
async def get_alert_device_top(
limit: int = Query(10, ge=1, le=50, description="Top N"),
days: int = Query(7, ge=1, le=90, description="统计天数"),
service: AlarmEventService = Depends(get_alarm_event_service),
current_user: dict = Depends(get_current_user)
):
"""获取告警最多的设备 Top N"""
data = service.get_device_top(limit=limit, days=days)
# 批量查询摄像头名称
camera_service = get_camera_name_service()
device_ids = [d["deviceId"] for d in data]
camera_info_map = {}
try:
camera_info_map = await camera_service.get_camera_infos_batch(device_ids)
except Exception as e:
logger.warning(f"批量查询摄像头信息失败: {e}")
for item in data:
did = item["deviceId"]
info = camera_info_map.get(did)
item["deviceName"] = camera_service.format_display_name(did, info)
return YudaoResponse.success(data)
@router.get("/alert/hour-distribution")
async def get_alert_hour_distribution(
days: int = Query(7, ge=1, le=90, description="统计天数"),
service: AlarmEventService = Depends(get_alarm_event_service),
current_user: dict = Depends(get_current_user)
):
"""获取 24 小时告警分布"""
data = service.get_hour_distribution(days=days)
return YudaoResponse.success(data)
# ==================== 设备告警汇总 ====================
@router.get("/device-summary/page")
async def get_device_summary_page(
pageNo: int = Query(1, ge=1, description="页码"),
pageSize: int = Query(20, ge=1, le=100, description="每页大小"),
service: AlarmEventService = Depends(get_alarm_event_service),
current_user: dict = Depends(get_current_user)
):
"""获取设备告警汇总(分页)"""
result = service.get_device_summary(page=pageNo, page_size=pageSize)
# 添加前端兼容字段别名,并查询摄像头名称(使用配置化服务)
camera_service = get_camera_name_service()
compat_list = []
for item in result.get("list", []):
device_id = item.get("deviceId")
# 查询摄像头信息(失败不影响列表返回)
camera_info = None
try:
camera_info = await camera_service.get_camera_info(device_id)
except Exception as e:
logger.warning(f"查询摄像头信息失败: device_id={device_id}, error={e}")
# 提取摄像头名称
device_name = camera_service.format_display_name(device_id, camera_info)
# 提取摄像头ID统一使用stream作为编号
camera_id = device_id # 默认值
if camera_info:
stream = camera_info.get("stream")
if stream:
camera_id = stream
elif "/" in device_id:
# app/stream 格式,直接解析
parts = device_id.split("/", 1)
if len(parts) == 2:
camera_id = parts[1]
item["deviceId"] = device_id # 原始ID用于查询
item["cameraId"] = camera_id # 摄像头IDstream编号
item["cameraName"] = device_name # 摄像头名称
item["deviceName"] = device_name # 摄像头名称(兼容)
item["pendingCount"] = item.get("unhandledCount")
item["lastAlertTime"] = item.get("lastEventTime")
item["lastAlertType"] = item.get("lastAlarmType")
item["lastAlertTypeName"] = _get_alarm_type_name(item.get("lastAlarmType"))
compat_list.append(item)
return YudaoResponse.page(
list_data=compat_list,
total=result.get("total", 0),
page_no=pageNo,
page_size=pageSize
)
# ==================== 边缘端告警上报 ====================
@router.post("/edge/report")
async def edge_alarm_report(
report: EdgeAlarmReport,
service: AlarmEventService = Depends(get_alarm_event_service),
current_user: dict = Depends(get_current_user),
):
"""
边缘端告警上报接口
边缘设备通过 HTTP POST 上报告警元数据,截图已预先上传到 COS。
支持幂等:通过 alarm_id 判断是否已存在。
"""
alarm = service.create_from_edge_report(report.model_dump())
if alarm is None:
return YudaoResponse.error(500, "告警创建失败")
# WebSocket 通知
try:
notification_svc = get_notification_service()
notification_svc.notify_sync("new_alert", alarm.to_dict())
except Exception:
pass # WebSocket 通知失败不影响主流程
# 异步上报运维平台(提前提取字段,避免 ORM session 关闭后无法访问)
ops_data = {
"alarm_id": alarm.alarm_id,
"alarm_type": alarm.alarm_type,
"device_id": alarm.device_id,
"event_time": alarm.event_time.strftime("%Y-%m-%dT%H:%M:%S") if isinstance(alarm.event_time, datetime) else str(alarm.event_time or ""),
"alarm_level": alarm.alarm_level or 2,
}
asyncio.create_task(_notify_ops_platform(ops_data))
# 异步触发 VLM 复核 + 企微通知(不阻塞响应)
try:
from app.services.notify_dispatch import process_alarm_notification
notify_data = {
"alarm_id": alarm.alarm_id,
"alarm_type": alarm.alarm_type,
"device_id": alarm.device_id,
"scene_id": alarm.scene_id,
"event_time": alarm.event_time,
"alarm_level": alarm.alarm_level,
"snapshot_url": alarm.snapshot_url,
}
asyncio.create_task(process_alarm_notification(notify_data))
except Exception:
pass # 通知失败不影响主流程
return YudaoResponse.success({
"alarmId": alarm.alarm_id,
"created": True,
})
@router.post("/edge/resolve")
async def edge_alarm_resolve(
resolve: EdgeAlarmResolve,
service: AlarmEventService = Depends(get_alarm_event_service),
):
"""
边缘端告警结束通知
Edge 在人员回岗确认或非工作时间到达时调用此接口,
更新告警的 duration_ms 和 last_frame_time。
"""
success = service.resolve_alarm(
alarm_id=resolve.alarm_id,
duration_ms=resolve.duration_ms,
last_frame_time=resolve.last_frame_time,
resolve_type=resolve.resolve_type,
)
if not success:
return YudaoResponse.error(404, "告警不存在")
return YudaoResponse.success(True)
# ==================== 辅助函数 ====================
OPS_ALARM_URL = os.getenv("OPS_ALARM_URL", "http://192.168.0.104:48080/admin-api/ops/alarm/receive")
async def _notify_ops_platform(data: dict):
"""异步上报告警到运维平台(失败不影响主流程)"""
payload = {
"alarmId": data["alarm_id"],
"alarmType": data["alarm_type"],
"deviceId": data["device_id"],
"eventTime": data["event_time"],
"alarmLevel": data["alarm_level"],
"notifyUserIds": [1],
"tenantId": 1,
}
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(OPS_ALARM_URL, json=payload)
if resp.status_code == 200:
logger.info(f"运维平台上报成功: {data['alarm_id']}, resp={resp.text[:200]}")
else:
logger.warning(f"运维平台上报失败: status={resp.status_code}, body={resp.text[:200]}")
except Exception as e:
logger.warning(f"运维平台上报异常: {e}")
def _get_alarm_type_name(alarm_type: Optional[str]) -> str:
"""获取告警类型名称"""
type_names = {
"leave_post": "离岗检测",
"intrusion": "周界入侵",
"crowd": "人群聚集",
"fire": "火焰检测",
"smoke": "烟雾检测",
"fall": "跌倒检测",
"helmet": "安全帽检测",
"unknown": "未知类型",
}
return type_names.get(alarm_type, alarm_type or "未知类型")