""" AIoT 告警路由 - 芋道规范(新三表结构) 统一到 /admin-api/aiot/alarm 命名空间,与 aiot 平台架构对齐。 API 路径规范: - /admin-api/aiot/alarm/alert/page - 分页查询 - /admin-api/aiot/alarm/alert/get - 获取详情 - /admin-api/aiot/alarm/alert/handle - 处理告警 - /admin-api/aiot/alarm/alert/delete - 删除告警 - /admin-api/aiot/alarm/alert/statistics - 获取统计 - /admin-api/aiot/alarm/device-summary/page - 设备告警汇总 """ from fastapi import APIRouter, Query, Depends, HTTPException from typing import Optional from datetime import datetime import httpx import asyncio from app.yudao_compat import YudaoResponse, get_current_user from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService from app.services.notification_service import get_notification_service from app.services.oss_storage import get_oss_storage from app.schemas import EdgeAlarmReport from app.utils.logger import logger router = APIRouter(prefix="/admin-api/aiot/alarm", tags=["AIoT-告警"]) def _alarm_to_camel(alarm_dict: dict) -> dict: """将 alarm_event 字典转换为前端 camelCase 格式(兼容前端旧字段名)""" # snapshot_url: 根据存储方式转为可访问 URL storage = get_oss_storage() snapshot_url = alarm_dict.get("snapshot_url") if snapshot_url: if snapshot_url.startswith("local:"): # 本地截图(COS 未配置时的回退路径) snapshot_url = "/captures/" + snapshot_url[6:] else: snapshot_url = storage.get_url(snapshot_url) # alarm_level int → 文本映射 alarm_level = alarm_dict.get("alarm_level") level_map = {1: "low", 2: "medium", 3: "high", 4: "critical"} level_str = level_map.get(alarm_level, "medium") if alarm_level else "medium" # alarm_status → 前端 status 映射 alarm_status = alarm_dict.get("alarm_status", "NEW") status_map = {"NEW": "pending", "CONFIRMED": "handled", "FALSE": "ignored", "CLOSED": "handled"} status_str = status_map.get(alarm_status, "pending") # confidence_score 0-1 → 百分比 confidence_score = alarm_dict.get("confidence_score") confidence_pct = round(confidence_score * 100) if confidence_score is not None else None # duration_ms → 分钟 duration_ms = alarm_dict.get("duration_ms") duration_minutes = round(duration_ms / 60000, 1) if duration_ms else None alarm_id = alarm_dict.get("alarm_id") return { # 新字段(三表结构) "alarmId": alarm_id, "alarmType": alarm_dict.get("alarm_type"), "alarmTypeName": _get_alarm_type_name(alarm_dict.get("alarm_type")), "algorithmCode": alarm_dict.get("algorithm_code"), "deviceId": alarm_dict.get("device_id"), "deviceName": alarm_dict.get("device_id"), "sceneId": alarm_dict.get("scene_id"), "eventTime": alarm_dict.get("event_time"), "firstFrameTime": alarm_dict.get("first_frame_time"), "lastFrameTime": alarm_dict.get("last_frame_time"), "durationMs": duration_ms, "alarmLevel": alarm_level, "confidenceScore": confidence_score, "alarmStatus": alarm_status, "handleStatus": alarm_dict.get("handle_status"), "snapshotUrl": snapshot_url, "videoUrl": alarm_dict.get("video_url"), "edgeNodeId": alarm_dict.get("edge_node_id"), "handler": alarm_dict.get("handler"), "handleRemark": alarm_dict.get("handle_remark"), "handledAt": alarm_dict.get("handled_at"), "createdAt": alarm_dict.get("created_at"), "updatedAt": alarm_dict.get("updated_at"), "ext": alarm_dict.get("ext"), "llmAnalyses": alarm_dict.get("llm_analyses"), # 兼容前端旧字段名 "id": alarm_id, "alertNo": alarm_id, "cameraId": alarm_dict.get("device_id"), "alertType": alarm_dict.get("alarm_type"), "alertTypeName": _get_alarm_type_name(alarm_dict.get("alarm_type")), "confidence": confidence_pct, "durationMinutes": duration_minutes, "status": status_str, "level": level_str, "triggerTime": alarm_dict.get("event_time"), "ossUrl": snapshot_url, "message": None, "bbox": (alarm_dict.get("ext") or {}).get("ext_data", {}).get("bbox") if isinstance(alarm_dict.get("ext"), dict) else None, } # ==================== 告警管理 ==================== @router.get("/alert/page") async def get_alert_page( pageNo: int = Query(1, ge=1, description="页码"), pageSize: int = Query(20, ge=1, le=100, description="每页大小"), deviceId: Optional[str] = Query(None, description="摄像头/设备ID"), cameraId: Optional[str] = Query(None, description="摄像头ID(兼容旧接口)"), edgeNodeId: Optional[str] = Query(None, description="边缘节点ID"), alarmType: Optional[str] = Query(None, alias="alertType", description="告警类型"), alarmStatus: Optional[str] = Query(None, alias="status", description="告警状态"), alarmLevel: Optional[int] = Query(None, description="告警级别: 1提醒/2一般/3严重/4紧急"), startTime: Optional[datetime] = Query(None, description="开始时间"), endTime: Optional[datetime] = Query(None, description="结束时间"), service: AlarmEventService = Depends(get_alarm_event_service), current_user: dict = Depends(get_current_user) ): """分页查询告警列表""" # 兼容旧接口的 status 文本转换 if alarmStatus and alarmStatus in ("pending", "handled", "ignored"): status_convert = {"pending": "NEW", "handled": "CONFIRMED", "ignored": "FALSE"} alarmStatus = status_convert.get(alarmStatus, alarmStatus) alarms, total = service.get_alarms( device_id=deviceId or cameraId, alarm_type=alarmType, alarm_status=alarmStatus, alarm_level=alarmLevel, edge_node_id=edgeNodeId, start_time=startTime, end_time=endTime, page=pageNo, page_size=pageSize, ) alarm_list = [_alarm_to_camel(a.to_dict()) for a in alarms] return YudaoResponse.page( list_data=alarm_list, total=total, page_no=pageNo, page_size=pageSize ) @router.get("/alert/get") async def get_alert( alarmId: Optional[str] = Query(None, description="告警ID"), id: Optional[str] = Query(None, description="告警ID(兼容旧接口)"), service: AlarmEventService = Depends(get_alarm_event_service), current_user: dict = Depends(get_current_user) ): """获取告警详情""" alarm_id = alarmId or id if not alarm_id: raise HTTPException(status_code=400, detail="缺少 alarmId 或 id 参数") alarm_dict = service.get_alarm(alarm_id) if not alarm_dict: raise HTTPException(status_code=404, detail="告警不存在") return YudaoResponse.success(_alarm_to_camel(alarm_dict)) @router.put("/alert/handle") async def handle_alert( alarmId: Optional[str] = Query(None, description="告警ID"), id: Optional[str] = Query(None, description="告警ID(兼容)"), alarmStatus: Optional[str] = Query(None, description="告警状态: CONFIRMED/FALSE/CLOSED"), handleStatus: Optional[str] = Query(None, description="处理状态: HANDLING/DONE"), status: Optional[str] = Query(None, description="处理状态(兼容旧接口)"), remark: Optional[str] = Query(None, description="处理备注"), service: AlarmEventService = Depends(get_alarm_event_service), current_user: dict = Depends(get_current_user) ): """处理告警""" alarm_id = alarmId or id if not alarm_id: raise HTTPException(status_code=400, detail="缺少 alarmId 或 id 参数") handler = current_user.get("username", "admin") # 兼容旧接口: status=handled → alarmStatus=CONFIRMED, status=ignored → alarmStatus=FALSE if not alarmStatus and status: status_convert = {"handled": "CONFIRMED", "ignored": "FALSE", "resolved": "CLOSED"} alarmStatus = status_convert.get(status, status.upper()) alarm = service.handle_alarm( alarm_id=alarm_id, alarm_status=alarmStatus, handle_status=handleStatus, remark=remark, handler=handler, ) if not alarm: raise HTTPException(status_code=404, detail="告警不存在") return YudaoResponse.success(True) @router.delete("/alert/delete") async def delete_alert( alarmId: Optional[str] = Query(None, description="告警ID"), id: Optional[str] = Query(None, description="告警ID(兼容)"), service: AlarmEventService = Depends(get_alarm_event_service), current_user: dict = Depends(get_current_user) ): """删除告警""" alarm_id = alarmId or id if not alarm_id: raise HTTPException(status_code=400, detail="缺少 alarmId 或 id 参数") success = service.delete_alarm(alarm_id) if not success: raise HTTPException(status_code=404, detail="告警不存在") return YudaoResponse.success(True) @router.get("/alert/statistics") async def get_statistics( service: AlarmEventService = Depends(get_alarm_event_service), current_user: dict = Depends(get_current_user) ): """获取告警统计""" stats = service.get_statistics() return YudaoResponse.success(stats) # ==================== 设备告警汇总 ==================== @router.get("/device-summary/page") async def get_device_summary_page( pageNo: int = Query(1, ge=1, description="页码"), pageSize: int = Query(20, ge=1, le=100, description="每页大小"), service: AlarmEventService = Depends(get_alarm_event_service), current_user: dict = Depends(get_current_user) ): """获取设备告警汇总(分页)""" result = service.get_device_summary(page=pageNo, page_size=pageSize) # 添加前端兼容字段别名 compat_list = [] for item in result.get("list", []): item["cameraId"] = item.get("deviceId") item["cameraName"] = item.get("deviceName") item["pendingCount"] = item.get("unhandledCount") item["lastAlertTime"] = item.get("lastEventTime") item["lastAlertType"] = item.get("lastAlarmType") item["lastAlertTypeName"] = _get_alarm_type_name(item.get("lastAlarmType")) compat_list.append(item) return YudaoResponse.page( list_data=compat_list, total=result.get("total", 0), page_no=pageNo, page_size=pageSize ) # ==================== 边缘端告警上报 ==================== @router.post("/edge/report") async def edge_alarm_report( report: EdgeAlarmReport, service: AlarmEventService = Depends(get_alarm_event_service), current_user: dict = Depends(get_current_user), ): """ 边缘端告警上报接口 边缘设备通过 HTTP POST 上报告警元数据,截图已预先上传到 COS。 支持幂等:通过 alarm_id 判断是否已存在。 """ alarm = service.create_from_edge_report(report.model_dump()) if alarm is None: return YudaoResponse.error(500, "告警创建失败") # WebSocket 通知 try: notification_svc = get_notification_service() notification_svc.notify_sync("new_alert", alarm.to_dict()) except Exception: pass # WebSocket 通知失败不影响主流程 # 异步上报运维平台(提前提取字段,避免 ORM session 关闭后无法访问) ops_data = { "alarm_id": alarm.alarm_id, "alarm_type": alarm.alarm_type, "device_id": alarm.device_id, "event_time": alarm.event_time.strftime("%Y-%m-%dT%H:%M:%S") if isinstance(alarm.event_time, datetime) else str(alarm.event_time or ""), "alarm_level": alarm.alarm_level or 2, } asyncio.create_task(_notify_ops_platform(ops_data)) return YudaoResponse.success({ "alarmId": alarm.alarm_id, "created": True, }) # ==================== 辅助函数 ==================== OPS_ALARM_URL = "http://192.168.0.104:48080/admin-api/ops/alarm/receive" async def _notify_ops_platform(data: dict): """异步上报告警到运维平台(失败不影响主流程)""" payload = { "alarmId": data["alarm_id"], "alarmType": data["alarm_type"], "deviceId": data["device_id"], "eventTime": data["event_time"], "alarmLevel": data["alarm_level"], "notifyUserIds": [1], "tenantId": 1, } try: async with httpx.AsyncClient(timeout=10) as client: resp = await client.post(OPS_ALARM_URL, json=payload) if resp.status_code == 200: logger.info(f"运维平台上报成功: {data['alarm_id']}, resp={resp.text[:200]}") else: logger.warning(f"运维平台上报失败: status={resp.status_code}, body={resp.text[:200]}") except Exception as e: logger.warning(f"运维平台上报异常: {e}") def _get_alarm_type_name(alarm_type: Optional[str]) -> str: """获取告警类型名称""" type_names = { "leave_post": "离岗检测", "intrusion": "周界入侵", "crowd": "人群聚集", "fire": "火焰检测", "smoke": "烟雾检测", "fall": "跌倒检测", "helmet": "安全帽检测", "unknown": "未知类型", } return type_names.get(alarm_type, alarm_type or "未知类型")