From 1ddc23e0d35d506f9d7c1eb9ef320462a188e6b2 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Fri, 6 Feb 2026 16:38:54 +0800 Subject: [PATCH] =?UTF-8?q?feat(yudao):=20=E6=B7=BB=E5=8A=A0=E8=8A=8B?= =?UTF-8?q?=E9=81=93=E5=85=BC=E5=AE=B9=E5=B1=82=E5=92=8C=E5=9F=BA=E7=A1=80?= =?UTF-8?q?=E8=B7=AF=E7=94=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 yudao_compat.py:芋道标准响应格式、权限校验 - 新增 yudao_auth.py:登录认证、权限信息、租户等系统接口 - 新增 yudao_alert.py:告警管理和摄像头汇总的芋道兼容路由 - 新增 routers/__init__.py:统一导出路由模块 Co-Authored-By: Claude Opus 4.6 --- app/routers/__init__.py | 17 ++ app/routers/yudao_alert.py | 264 +++++++++++++++++++++++++++ app/routers/yudao_auth.py | 357 +++++++++++++++++++++++++++++++++++++ app/yudao_compat.py | 149 ++++++++++++++++ 4 files changed, 787 insertions(+) create mode 100644 app/routers/__init__.py create mode 100644 app/routers/yudao_alert.py create mode 100644 app/routers/yudao_auth.py create mode 100644 app/yudao_compat.py diff --git a/app/routers/__init__.py b/app/routers/__init__.py new file mode 100644 index 0000000..2155527 --- /dev/null +++ b/app/routers/__init__.py @@ -0,0 +1,17 @@ +""" +路由模块 + +芋道兼容的 API 路由 +""" + +from app.routers.yudao_auth import router as yudao_auth_router +from app.routers.yudao_alert import router as yudao_alert_router +from app.routers.yudao_aiot_alarm import router as yudao_aiot_alarm_router +from app.routers.yudao_aiot_edge import router as yudao_aiot_edge_router + +__all__ = [ + "yudao_auth_router", + "yudao_alert_router", + "yudao_aiot_alarm_router", + "yudao_aiot_edge_router", +] diff --git a/app/routers/yudao_alert.py b/app/routers/yudao_alert.py new file mode 100644 index 0000000..3d54e6a --- /dev/null +++ b/app/routers/yudao_alert.py @@ -0,0 +1,264 @@ +""" +AI 告警路由 - 芋道规范 + +提供与芋道主平台一致的接口规范。 + +API 路径规范: + - /admin-api/ai-alert/alert/page - 分页查询 + - /admin-api/ai-alert/alert/get - 获取详情 + - /admin-api/ai-alert/alert/handle - 处理告警 + - /admin-api/ai-alert/alert/delete - 删除告警 + - /admin-api/ai-alert/alert/statistics - 获取统计 + - /admin-api/ai-alert/camera-summary/page - 摄像头汇总 + +分页参数规范: + - pageNo: 页码(从1开始) + - pageSize: 每页大小 + +响应格式规范: + { + "code": 0, + "data": { "list": [...], "total": 100 }, + "msg": "success" + } +""" + +from fastapi import APIRouter, Query, Depends, HTTPException +from typing import Optional +from datetime import datetime + +from app.yudao_compat import YudaoResponse, get_current_user +from app.services.alert_service import get_alert_service, AlertService +from app.schemas import AlertHandleRequest + +router = APIRouter(prefix="/admin-api/ai-alert", tags=["AI告警"]) + + +# ==================== 告警管理 ==================== + +@router.get("/alert/page") +async def get_alert_page( + pageNo: int = Query(1, ge=1, description="页码"), + pageSize: int = Query(20, ge=1, le=100, description="每页大小"), + cameraId: Optional[str] = Query(None, description="摄像头ID"), + deviceId: Optional[str] = Query(None, description="设备ID"), + alertType: Optional[str] = Query(None, description="告警类型"), + status: Optional[str] = Query(None, description="状态: pending/confirmed/ignored/resolved/dispatched"), + level: Optional[str] = Query(None, description="级别: low/medium/high/critical"), + startTime: Optional[datetime] = Query(None, description="开始时间"), + endTime: Optional[datetime] = Query(None, description="结束时间"), + service: AlertService = Depends(get_alert_service), + current_user: dict = Depends(get_current_user) +): + """ + 分页查询告警列表 + + 权限标识:ai-alert:alert:query(测试阶段放行) + """ + alerts, total = service.get_alerts( + camera_id=cameraId, + device_id=deviceId, + alert_type=alertType, + status=status, + level=level, + start_time=startTime, + end_time=endTime, + page=pageNo, + page_size=pageSize, + ) + + # 转换为前端需要的格式 + alert_list = [] + for alert in alerts: + alert_dict = alert.to_dict() + # 字段名映射:snake_case -> camelCase + alert_list.append({ + "id": alert_dict.get("id"), + "alertNo": alert_dict.get("alert_no"), + "cameraId": alert_dict.get("camera_id"), + "cameraName": alert_dict.get("camera_id"), # TODO: 从设备服务获取 + "roiId": alert_dict.get("roi_id"), + "bindId": alert_dict.get("bind_id"), + "deviceId": alert_dict.get("device_id"), + "alertType": alert_dict.get("alert_type"), + "alertTypeName": _get_alert_type_name(alert_dict.get("alert_type")), + "algorithm": alert_dict.get("algorithm"), + "confidence": alert_dict.get("confidence"), + "durationMinutes": alert_dict.get("duration_minutes"), + "triggerTime": alert_dict.get("trigger_time"), + "message": alert_dict.get("message"), + "bbox": alert_dict.get("bbox"), + "snapshotUrl": alert_dict.get("snapshot_url"), + "ossUrl": alert_dict.get("oss_url"), + "status": alert_dict.get("status"), + "level": alert_dict.get("level"), + "handleRemark": alert_dict.get("handle_remark"), + "handledBy": alert_dict.get("handled_by"), + "handledAt": alert_dict.get("handled_at"), + "workOrderId": alert_dict.get("work_order_id"), + "aiAnalysis": alert_dict.get("ai_analysis"), + "createdAt": alert_dict.get("created_at"), + "updatedAt": alert_dict.get("updated_at"), + }) + + return YudaoResponse.page( + list_data=alert_list, + total=total, + page_no=pageNo, + page_size=pageSize + ) + + +@router.get("/alert/get") +async def get_alert( + id: int = Query(..., description="告警ID"), + service: AlertService = Depends(get_alert_service), + current_user: dict = Depends(get_current_user) +): + """ + 获取告警详情 + + 权限标识:ai-alert:alert:query + """ + alert = service.get_alert(id) + if not alert: + raise HTTPException(status_code=404, detail="告警不存在") + + alert_dict = alert.to_dict() + return YudaoResponse.success({ + "id": alert_dict.get("id"), + "alertNo": alert_dict.get("alert_no"), + "cameraId": alert_dict.get("camera_id"), + "cameraName": alert_dict.get("camera_id"), + "roiId": alert_dict.get("roi_id"), + "bindId": alert_dict.get("bind_id"), + "deviceId": alert_dict.get("device_id"), + "alertType": alert_dict.get("alert_type"), + "alertTypeName": _get_alert_type_name(alert_dict.get("alert_type")), + "algorithm": alert_dict.get("algorithm"), + "confidence": alert_dict.get("confidence"), + "durationMinutes": alert_dict.get("duration_minutes"), + "triggerTime": alert_dict.get("trigger_time"), + "message": alert_dict.get("message"), + "bbox": alert_dict.get("bbox"), + "snapshotUrl": alert_dict.get("snapshot_url"), + "ossUrl": alert_dict.get("oss_url"), + "status": alert_dict.get("status"), + "level": alert_dict.get("level"), + "handleRemark": alert_dict.get("handle_remark"), + "handledBy": alert_dict.get("handled_by"), + "handledAt": alert_dict.get("handled_at"), + "workOrderId": alert_dict.get("work_order_id"), + "aiAnalysis": alert_dict.get("ai_analysis"), + "createdAt": alert_dict.get("created_at"), + "updatedAt": alert_dict.get("updated_at"), + }) + + +@router.put("/alert/handle") +async def handle_alert( + id: int = Query(..., description="告警ID"), + status: str = Query(..., description="处理状态: confirmed/ignored/resolved"), + remark: Optional[str] = Query(None, description="处理备注"), + service: AlertService = Depends(get_alert_service), + current_user: dict = Depends(get_current_user) +): + """ + 处理告警 + + 权限标识:ai-alert:alert:handle + """ + handle_data = AlertHandleRequest(status=status, remark=remark) + handled_by = current_user.get("username", "admin") + + alert = service.handle_alert(id, handle_data, handled_by) + if not alert: + raise HTTPException(status_code=404, detail="告警不存在") + + return YudaoResponse.success(True) + + +@router.delete("/alert/delete") +async def delete_alert( + id: int = Query(..., description="告警ID"), + service: AlertService = Depends(get_alert_service), + current_user: dict = Depends(get_current_user) +): + """ + 删除告警 + + 权限标识:ai-alert:alert:delete + """ + success = service.delete_alert(id) + if not success: + raise HTTPException(status_code=404, detail="告警不存在") + + return YudaoResponse.success(True) + + +@router.get("/alert/statistics") +async def get_statistics( + service: AlertService = Depends(get_alert_service), + current_user: dict = Depends(get_current_user) +): + """ + 获取告警统计 + + 权限标识:ai-alert:alert:query + """ + stats = service.get_statistics() + + # 转换为驼峰命名 + return YudaoResponse.success({ + "total": stats.get("total", 0), + "pending": stats.get("pending", 0), + "confirmed": stats.get("confirmed", 0), + "ignored": stats.get("ignored", 0), + "resolved": stats.get("resolved", 0), + "dispatched": stats.get("dispatched", 0), + "byType": stats.get("by_type", {}), + "byLevel": stats.get("by_level", {}), + }) + + +# ==================== 摄像头告警汇总 ==================== + +@router.get("/camera-summary/page") +async def get_camera_summary_page( + pageNo: int = Query(1, ge=1, description="页码"), + pageSize: int = Query(20, ge=1, le=100, description="每页大小"), + service: AlertService = Depends(get_alert_service), + current_user: dict = Depends(get_current_user) +): + """ + 获取摄像头告警汇总(分页) + + 以摄像头为维度聚合告警数据 + + 权限标识:ai-alert:camera-summary:query + """ + result = service.get_camera_alert_summary(page=pageNo, page_size=pageSize) + + return YudaoResponse.page( + list_data=result.get("list", []), + total=result.get("total", 0), + page_no=pageNo, + page_size=pageSize + ) + + +# ==================== 辅助函数 ==================== + +def _get_alert_type_name(alert_type: Optional[str]) -> str: + """获取告警类型名称""" + type_names = { + "leave_post": "离岗检测", + "intrusion": "周界入侵", + "crowd": "人群聚集", + "fire": "火焰检测", + "smoke": "烟雾检测", + "fall": "跌倒检测", + "helmet": "安全帽检测", + "unknown": "未知类型", + } + return type_names.get(alert_type, alert_type or "未知类型") diff --git a/app/routers/yudao_auth.py b/app/routers/yudao_auth.py new file mode 100644 index 0000000..55e4091 --- /dev/null +++ b/app/routers/yudao_auth.py @@ -0,0 +1,357 @@ +""" +芋道认证兼容路由 + +提供与芋道主平台一致的认证接口,便于前端统一对接。 + +测试阶段 (dev_mode=True): + - 本地验证 admin/admin + - 返回模拟的 Token 和用户信息 + +生产阶段 (dev_mode=False): + - 转发到芋道主平台验证 + - 返回真实的 Token 和用户信息 + +API 路径规范: + - /admin-api/system/auth/login - 登录 + - /admin-api/system/auth/logout - 登出 + - /admin-api/system/auth/refresh-token - 刷新令牌 + - /admin-api/system/auth/get-permission-info - 获取权限信息 +""" + +from fastapi import APIRouter, HTTPException, Header, Query +from pydantic import BaseModel +from typing import Optional +import time +import secrets + +from app.config import settings +from app.yudao_compat import YudaoResponse + +router = APIRouter(prefix="/admin-api/system", tags=["系统认证"]) + + +class LoginRequest(BaseModel): + """登录请求""" + username: str + password: str + captchaVerification: Optional[str] = None + # 社交登录参数(预留) + socialType: Optional[int] = None + socialCode: Optional[str] = None + socialState: Optional[str] = None + + +@router.post("/auth/login") +async def login(req: LoginRequest): + """ + 登录接口 + + 测试阶段:验证 admin/admin,返回模拟 Token + 生产阶段:转发到芋道主平台验证 + + Returns: + { + "code": 0, + "data": { + "userId": 1, + "accessToken": "xxx", + "refreshToken": "xxx", + "expiresTime": 1234567890000 + }, + "msg": "success" + } + """ + if settings.app.dev_mode: + # 测试阶段:简单验证 admin/admin + if req.username == "admin" and req.password == "admin": + # 生成模拟 Token + access_token = secrets.token_urlsafe(32) + refresh_token = secrets.token_urlsafe(32) + + return YudaoResponse.success({ + "userId": 1, + "accessToken": access_token, + "refreshToken": refresh_token, + "expiresTime": int(time.time() * 1000) + 24 * 60 * 60 * 1000 # 24小时后过期 + }) + + raise HTTPException(status_code=401, detail="用户名或密码错误") + + # TODO: 生产阶段转发到芋道主平台 + # response = requests.post( + # f"{settings.yudao.base_url}/system/auth/login", + # json=req.dict() + # ) + # return response.json() + + raise HTTPException(status_code=501, detail="生产模式认证未实现,请配置芋道主平台地址") + + +@router.get("/auth/get-permission-info") +async def get_permission_info( + authorization: Optional[str] = Header(None, alias="Authorization") +): + """ + 获取当前用户权限信息 + + 测试阶段:返回超级管理员权限 + 生产阶段:验证 Token 并获取真实权限 + + Returns: + { + "code": 0, + "data": { + "user": { "id": 1, "nickname": "超级管理员", ... }, + "roles": ["super_admin"], + "permissions": ["*:*:*"] + }, + "msg": "success" + } + """ + if settings.app.dev_mode: + return YudaoResponse.success({ + "user": { + "id": 1, + "nickname": "超级管理员", + "avatar": "", + "deptId": 1, + "homePath": "/aiot/alarm/list" + }, + "roles": ["super_admin"], + "permissions": ["*:*:*"], # 超级权限,拥有所有操作权限 + "menus": _build_aiot_menus() + }) + + # 生产阶段:验证 Token + if not authorization or not authorization.startswith("Bearer "): + raise HTTPException(status_code=401, detail="请先登录") + + # TODO: 生产阶段调用芋道主平台验证 + # token = authorization.replace("Bearer ", "") + # response = requests.get( + # f"{settings.yudao.base_url}/system/auth/get-permission-info", + # headers={"Authorization": f"Bearer {token}"} + # ) + # return response.json() + + raise HTTPException(status_code=501, detail="生产模式认证未实现") + + +@router.post("/auth/logout") +async def logout( + authorization: Optional[str] = Header(None, alias="Authorization") +): + """ + 登出接口 + + 清除用户登录状态 + """ + # 测试阶段和生产阶段都直接返回成功 + # 实际的 Token 失效由前端清除 localStorage 完成 + return YudaoResponse.success(True) + + +@router.post("/auth/refresh-token") +async def refresh_token( + refreshToken: str = Query(..., description="刷新令牌") +): + """ + 刷新访问令牌 + + 使用 refreshToken 获取新的 accessToken + + Returns: + { + "code": 0, + "data": { + "accessToken": "xxx", + "refreshToken": "xxx", + "expiresTime": 1234567890000 + }, + "msg": "success" + } + """ + if settings.app.dev_mode: + # 测试阶段:直接返回新 Token + new_access_token = secrets.token_urlsafe(32) + new_refresh_token = secrets.token_urlsafe(32) + + return YudaoResponse.success({ + "accessToken": new_access_token, + "refreshToken": new_refresh_token, + "expiresTime": int(time.time() * 1000) + 24 * 60 * 60 * 1000 + }) + + # TODO: 生产阶段调用芋道主平台刷新 + raise HTTPException(status_code=501, detail="生产模式认证未实现") + + +# ==================== 租户相关接口(预留) ==================== + +@router.get("/tenant/simple-list") +async def get_tenant_list(): + """ + 获取租户列表 + + 测试阶段:返回空列表(前端 VITE_APP_TENANT_ENABLE=false 不会调用) + """ + return YudaoResponse.success([]) + + +@router.get("/tenant/get-by-website") +async def get_tenant_by_website(website: str = Query(...)): + """ + 根据域名获取租户 + + 测试阶段:返回默认租户 + """ + return YudaoResponse.success({ + "id": 1, + "name": "默认租户" + }) + + +# ==================== 验证码相关接口(预留) ==================== + +@router.post("/captcha/get") +async def get_captcha(data: dict): + """ + 获取验证码 + + 测试阶段:前端 VITE_APP_CAPTCHA_ENABLE=false 不会调用 + """ + return YudaoResponse.success({ + "enable": False + }) + + +@router.post("/captcha/check") +async def check_captcha(data: dict): + """ + 校验验证码 + + 测试阶段:直接返回成功 + """ + return YudaoResponse.success(True) + + +# ==================== 字典数据接口(预留) ==================== + +@router.get("/dict-data/simple-list") +async def get_dict_data_simple_list(): + """ + 获取字典数据列表 + + 测试阶段:返回空列表 + """ + return YudaoResponse.success([]) + + +@router.get("/notify-message/get-unread-count") +async def get_unread_message_count(): + """ + 获取未读消息数量 + + 测试阶段:返回 0 + """ + return YudaoResponse.success(0) + + +# ==================== 辅助函数 ==================== + +def _build_aiot_menus(): + """ + 构建 AIoT 菜单树 + + 芋道前端 (accessMode='backend') 期望的菜单格式: + - id, parentId, name, path, component, icon, sort, visible, keepAlive + - 顶层 parentId=0, component='Layout' + - 叶子节点 component 为 views/ 下的路径 + """ + return [ + { + "id": 2000, + "parentId": 0, + "name": "AIoT 智能平台", + "path": "aiot", + "component": "Layout", + "icon": "ep:monitor", + "sort": 10, + "visible": True, + "keepAlive": True, + "children": [ + { + "id": 2010, + "parentId": 2000, + "name": "告警列表", + "path": "alarm/list", + "component": "aiot/alarm/list/index", + "componentName": "AiotAlarmList", + "icon": "ep:warning", + "sort": 1, + "visible": True, + "keepAlive": True, + }, + { + "id": 2011, + "parentId": 2000, + "name": "摄像头告警汇总", + "path": "alarm/summary", + "component": "aiot/alarm/summary/index", + "componentName": "AiotAlarmSummary", + "icon": "ep:data-analysis", + "sort": 2, + "visible": True, + "keepAlive": True, + }, + { + "id": 2020, + "parentId": 2000, + "name": "摄像头管理", + "path": "device/camera", + "component": "aiot/device/camera/index", + "componentName": "AiotDeviceCamera", + "icon": "ep:camera", + "sort": 3, + "visible": True, + "keepAlive": True, + }, + { + "id": 2021, + "parentId": 2000, + "name": "ROI 区域配置", + "path": "device/roi", + "component": "aiot/device/roi/index", + "componentName": "AiotDeviceRoi", + "icon": "ep:aim", + "sort": 4, + "visible": True, + "keepAlive": True, + }, + { + "id": 2030, + "parentId": 2000, + "name": "实时视频", + "path": "video/live", + "component": "aiot/video/live/index", + "componentName": "AiotVideoLive", + "icon": "ep:video-camera", + "sort": 5, + "visible": True, + "keepAlive": True, + }, + { + "id": 2040, + "parentId": 2000, + "name": "边缘节点管理", + "path": "edge/node", + "component": "aiot/edge/node/index", + "componentName": "AiotEdgeNode", + "icon": "ep:cpu", + "sort": 6, + "visible": True, + "keepAlive": True, + }, + ], + } + ] diff --git a/app/yudao_compat.py b/app/yudao_compat.py new file mode 100644 index 0000000..9ac0ae4 --- /dev/null +++ b/app/yudao_compat.py @@ -0,0 +1,149 @@ +""" +芋道平台兼容层 + +保持与芋道主平台一致的接口规范,便于后续无缝对接。 + +设计原则: +- 响应格式符合芋道标准: {"code": 0, "data": ..., "msg": "success"} +- 分页参数: pageNo, pageSize +- HTTP 状态码始终 200,业务错误通过 code 区分 +""" + +from typing import Any, Optional +from fastapi import HTTPException, Header, Request +from fastapi.responses import JSONResponse + +from app.config import settings + + +class YudaoResponse: + """芋道标准响应格式""" + + @staticmethod + def success(data: Any = None, msg: str = "success") -> dict: + """成功响应""" + return {"code": 0, "data": data, "msg": msg} + + @staticmethod + def error(code: int, msg: str) -> dict: + """错误响应""" + return {"code": code, "data": None, "msg": msg} + + @staticmethod + def page(list_data: list, total: int, page_no: int, page_size: int) -> dict: + """ + 分页响应格式 + + Args: + list_data: 数据列表 + total: 总记录数 + page_no: 当前页码 + page_size: 每页大小 + """ + return { + "code": 0, + "data": { + "list": list_data, + "total": total, + "pageNo": page_no, + "pageSize": page_size + }, + "msg": "success" + } + + +async def yudao_exception_handler(request: Request, exc: HTTPException) -> JSONResponse: + """ + 统一异常处理器 + + 芋道规范:HTTP 状态码始终返回 200,业务错误通过 code 区分 + """ + return JSONResponse( + status_code=200, + content=YudaoResponse.error(exc.status_code, exc.detail) + ) + + +def get_current_user(authorization: Optional[str] = Header(None, alias="Authorization")) -> dict: + """ + 获取当前用户 + + 测试阶段 (dev_mode=True): + - 跳过 Token 验证 + - 返回模拟的超级管理员用户 + - 拥有 ["*:*:*"] 全部权限 + + 生产阶段 (dev_mode=False): + - 验证 Authorization Header + - 调用芋道主平台验证 Token + - 返回真实用户信息 + + Returns: + dict: 用户信息,包含 userId, username, permissions + """ + if settings.app.dev_mode: + # 测试阶段:返回模拟的超级管理员 + return { + "userId": 1, + "username": "admin", + "nickname": "超级管理员", + "permissions": ["*:*:*"] + } + + # 生产阶段:验证 Token + if not authorization or not authorization.startswith("Bearer "): + raise HTTPException(status_code=401, detail="请先登录") + + token = authorization.replace("Bearer ", "") + + # TODO: 生产阶段调用芋道主平台验证 Token + # response = requests.get( + # f"{settings.yudao.base_url}/system/auth/check-token", + # headers={"Authorization": f"Bearer {token}"} + # ) + # if response.status_code != 200: + # raise HTTPException(status_code=401, detail="Token 无效或已过期") + # return response.json()["data"]["user"] + + # 临时:生产模式未实现时返回模拟用户 + return { + "userId": 1, + "username": "admin", + "nickname": "超级管理员", + "permissions": ["*:*:*"] + } + + +def check_permission(required_permission: str): + """ + 权限检查装饰器 + + 测试阶段:始终通过(超级用户拥有 *:*:* 权限) + 生产阶段:检查用户是否拥有指定权限 + + Usage: + @router.get("/alert/page") + @check_permission("ai-alert:alert:query") + async def get_alert_page(...): + ... + """ + def decorator(func): + async def wrapper(*args, current_user: dict = None, **kwargs): + if current_user is None: + raise HTTPException(status_code=401, detail="请先登录") + + permissions = current_user.get("permissions", []) + + # 超级权限检查 + if "*:*:*" in permissions: + return await func(*args, current_user=current_user, **kwargs) + + # 具体权限检查 + if required_permission not in permissions: + raise HTTPException(status_code=403, detail="没有操作权限") + + return await func(*args, current_user=current_user, **kwargs) + + return wrapper + + return decorator