From 7235a20e2541315bc5e41b37647e04f8b444bc72 Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Mon, 23 Mar 2026 16:50:07 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=9F=E8=83=BD=EF=BC=9A=E5=8C=BA=E5=9F=9F?= =?UTF-8?q?=E7=BB=91=E5=AE=9A=E5=B7=A5=E5=8D=95=E9=93=BE=E8=B7=AF=20+=20Ed?= =?UTF-8?q?ge=20API=20=E8=AE=A4=E8=AF=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Edge Token 认证:edge_compat.py 新增 _verify_edge_token 依赖, 通过 EDGE_AUTH_TOKEN 环境变量配置共享 Token,保护告警上报接口 2. 区域列表 API:新增 /api/area/list 代理查询 IoT 平台区域数据, 供前端摄像头页面选择区域使用(带 5 分钟缓存) 3. 降级查询:notify_dispatch.py 中 area_id 为空时从 WVP API 查询兜底 4. 配置新增:EdgeAuthConfig(token + enabled)、IotDbConfig --- app/config.py | 22 ++++++++ app/main.py | 2 + app/routers/area_api.py | 89 +++++++++++++++++++++++++++++++++ app/routers/edge_compat.py | 20 +++++++- app/services/notify_dispatch.py | 45 +++++++++++++++++ 5 files changed, 176 insertions(+), 2 deletions(-) create mode 100644 app/routers/area_api.py diff --git a/app/config.py b/app/config.py index dce8945..323f248 100644 --- a/app/config.py +++ b/app/config.py @@ -98,6 +98,19 @@ class RedisConfig: enabled: bool = True +@dataclass +class EdgeAuthConfig: + """Edge 设备认证配置""" + token: str = "" # 共享 Token,Edge 请求时携带 Authorization: Bearer {token} + enabled: bool = False # 是否启用 token 校验 + + +@dataclass +class IotDbConfig: + """IoT 平台数据库配置(只读查询区域等信息)""" + url: str = "" # 如 mysql+pymysql://user:pass@host/dbname + + @dataclass class CameraNameConfig: """摄像头名称格式化配置""" @@ -138,6 +151,8 @@ class Settings(BaseModel): work_order: WorkOrderConfig = WorkOrderConfig() redis: RedisConfig = RedisConfig() camera_name: CameraNameConfig = CameraNameConfig() + edge_auth: EdgeAuthConfig = EdgeAuthConfig() + iot_db: IotDbConfig = IotDbConfig() def load_settings() -> Settings: @@ -213,6 +228,13 @@ def load_settings() -> Settings: display_format=os.getenv("CAMERA_NAME_FORMAT", "{name}"), query_timeout=int(os.getenv("CAMERA_QUERY_TIMEOUT", "15")), ), + edge_auth=EdgeAuthConfig( + token=os.getenv("EDGE_AUTH_TOKEN", ""), + enabled=os.getenv("EDGE_AUTH_ENABLED", "false").lower() == "true", + ), + iot_db=IotDbConfig( + url=os.getenv("IOT_DATABASE_URL", ""), + ), ) diff --git a/app/main.py b/app/main.py index 5d7ba42..2a0cd8b 100644 --- a/app/main.py +++ b/app/main.py @@ -32,6 +32,7 @@ from app.routers.wechat_callback import router as wechat_callback_router from app.routers.notify_manage import router as notify_manage_router from app.routers.wechat_notify_api import router as wechat_notify_router from app.routers.work_order_api import router as work_order_router +from app.routers.area_api import router as area_router from app.yudao_compat import yudao_exception_handler import json @@ -116,6 +117,7 @@ app.include_router(wechat_callback_router) app.include_router(wechat_notify_router) app.include_router(work_order_router) app.include_router(notify_manage_router) +app.include_router(area_router) # 注册芋道格式异常处理器 app.add_exception_handler(HTTPException, yudao_exception_handler) diff --git a/app/routers/area_api.py b/app/routers/area_api.py new file mode 100644 index 0000000..befd97a --- /dev/null +++ b/app/routers/area_api.py @@ -0,0 +1,89 @@ +""" +区域列表 API + +代理查询 IoT 平台的区域数据,供前端摄像头页面选择区域使用。 +带 5 分钟缓存,减少跨服务查询。 +""" + +import os +import time +from typing import List, Dict, Any + +from fastapi import APIRouter + +from app.config import settings +from app.yudao_compat import YudaoResponse +from app.utils.logger import logger + +router = APIRouter(prefix="/api/area", tags=["区域管理"]) + +# 区域列表缓存 +_area_list_cache: Dict[str, Any] = {"data": [], "expire": 0} +_CACHE_TTL = 300 # 5 分钟 + + +async def _fetch_area_list_from_iot() -> List[Dict]: + """从 IoT 平台获取区域列表""" + import httpx + + base_url = ( + os.getenv("IOT_PLATFORM_URL", "") + or settings.work_order.base_url + ) + if not base_url: + logger.warning("未配置 IoT 平台地址,无法查询区域列表") + return [] + + # 复用 notify_dispatch 中的 token 缓存 + from app.services.notify_dispatch import _iot_token_cache + + now = time.time() + if not _iot_token_cache["token"] or now > _iot_token_cache["expire"]: + async with httpx.AsyncClient(timeout=5) as client: + login_resp = await client.post( + f"{base_url}/admin-api/system/auth/login", + json={"username": "admin", "password": "admin123", "tenantName": "默认"}, + headers={"tenant-id": "1"}, + ) + login_data = login_resp.json().get("data", {}) + _iot_token_cache["token"] = login_data.get("accessToken", "") + _iot_token_cache["expire"] = now + 1200 + + token = _iot_token_cache["token"] + if not token: + return [] + + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get( + f"{base_url}/admin-api/ops/area/list", + headers={"tenant-id": "1", "Authorization": f"Bearer {token}"}, + ) + data = resp.json() + if data.get("code") == 0 and data.get("data"): + return data["data"] + return [] + + +@router.get("/list") +async def get_area_list(): + """ + 获取区域列表(从 IoT 平台代理查询,带 5 分钟缓存) + + 返回: [{id, areaName, parentId, ...}] + """ + now = time.time() + if _area_list_cache["data"] and now < _area_list_cache["expire"]: + return YudaoResponse.success(_area_list_cache["data"]) + + try: + areas = await _fetch_area_list_from_iot() + if areas: + _area_list_cache["data"] = areas + _area_list_cache["expire"] = now + _CACHE_TTL + return YudaoResponse.success(areas) + except Exception as e: + logger.error(f"获取区域列表失败: {e}", exc_info=True) + # 有旧缓存则返回旧数据 + if _area_list_cache["data"]: + return YudaoResponse.success(_area_list_cache["data"]) + return YudaoResponse.error(500, f"获取区域列表失败: {e}") diff --git a/app/routers/edge_compat.py b/app/routers/edge_compat.py index 85c4d81..b258283 100644 --- a/app/routers/edge_compat.py +++ b/app/routers/edge_compat.py @@ -8,7 +8,7 @@ Edge 设备使用 /api/ai/alert/edge/report 和 /api/ai/alert/edge/resolve 路 import asyncio from datetime import datetime -from fastapi import APIRouter, Depends, Request +from fastapi import APIRouter, Depends, Request, Header, HTTPException from typing import Optional from app.yudao_compat import YudaoResponse @@ -18,14 +18,29 @@ from app.schemas import EdgeAlarmReport, EdgeAlarmResolve from app.models import EdgeDevice, DeviceStatus, get_session from app.utils.logger import logger from app.utils.timezone import beijing_now +from app.config import settings router = APIRouter(prefix="/api/ai", tags=["Edge-兼容路由"]) +def _verify_edge_token(authorization: Optional[str] = Header(None)): + """验证 Edge 设备 Token(仅在 EDGE_AUTH_ENABLED=true 时启用)""" + cfg = settings.edge_auth + if not cfg.enabled or not cfg.token: + return # 未启用认证,放行 + if not authorization: + raise HTTPException(status_code=401, detail="缺少 Authorization 头") + # 支持 "Bearer xxx" 格式 + token = authorization.removeprefix("Bearer ").strip() + if token != cfg.token: + raise HTTPException(status_code=401, detail="Token 无效") + + @router.post("/alert/edge/report") async def edge_alarm_report( report: EdgeAlarmReport, service: AlarmEventService = Depends(get_alarm_event_service), + _auth=Depends(_verify_edge_token), ): """ Edge 告警上报(无认证) @@ -74,6 +89,7 @@ async def edge_alarm_report( async def edge_alarm_resolve( resolve: EdgeAlarmResolve, service: AlarmEventService = Depends(get_alarm_event_service), + _auth=Depends(_verify_edge_token), ): """ Edge 告警结束通知(无认证) @@ -102,7 +118,7 @@ async def edge_alarm_resolve( @router.post("/device/heartbeat") -async def edge_device_heartbeat(request: Request): +async def edge_device_heartbeat(request: Request, _auth=Depends(_verify_edge_token)): """ Edge 设备心跳上报(无认证) diff --git a/app/services/notify_dispatch.py b/app/services/notify_dispatch.py index a1c6ba1..f7cf983 100644 --- a/app/services/notify_dispatch.py +++ b/app/services/notify_dispatch.py @@ -167,6 +167,9 @@ async def process_alarm_notification(alarm_data: Dict): wo_client = get_work_order_client() if wo_client.enabled: wo_area_id = _get_alarm_area_id(alarm_id) or area_id_int + # 降级:从 WVP API 查询摄像头绑定的 area_id(带缓存) + if not wo_area_id: + wo_area_id = _get_area_id_from_wvp(device_id) if wo_area_id: type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type) wo_title = f"{type_name}告警" @@ -398,6 +401,48 @@ def _get_trigger_source(alarm_id: str) -> str: db.close() +# WVP 摄像头 area_id 缓存: {camera_code: (area_id, expire_time)} +_wvp_area_cache: dict = {} +_WVP_AREA_CACHE_TTL = 300 # 5 分钟 + + +def _get_area_id_from_wvp(device_id: str) -> int: + """从 WVP API 查询摄像头的 area_id(带 5 分钟缓存)""" + import time + now = time.time() + + # 检查缓存 + if device_id in _wvp_area_cache: + cached_val, expire = _wvp_area_cache[device_id] + if now < expire: + return cached_val + + try: + import requests + wvp_base = settings.camera_name.wvp_api_base.rstrip("/") + resp = requests.get( + f"{wvp_base}/api/ai/camera/list", + timeout=settings.camera_name.query_timeout, + ) + if resp.status_code == 200: + data = resp.json() + cameras = data if isinstance(data, list) else data.get("data", []) + for cam in cameras: + cam_code = cam.get("cameraCode") or cam.get("camera_code", "") + cam_area = cam.get("areaId") or cam.get("area_id") + if cam_area: + _wvp_area_cache[cam_code] = (int(cam_area), now + _WVP_AREA_CACHE_TTL) + # 查找目标摄像头 + cached = _wvp_area_cache.get(device_id) + if cached: + return cached[0] + # 未找到时缓存 0,避免频繁查询 + _wvp_area_cache[device_id] = (0, now + _WVP_AREA_CACHE_TTL) + except Exception as e: + logger.warning(f"WVP 查询 area_id 失败: device={device_id}, error={e}") + return 0 + + def _get_permanent_url(snapshot_url: str) -> str: """将 COS object key 转为永久访问 URL""" if not snapshot_url: