功能:区域绑定工单链路 + Edge API 认证

1. Edge Token 认证:edge_compat.py 新增 _verify_edge_token 依赖,
   通过 EDGE_AUTH_TOKEN 环境变量配置共享 Token,保护告警上报接口
2. 区域列表 API:新增 /api/area/list 代理查询 IoT 平台区域数据,
   供前端摄像头页面选择区域使用(带 5 分钟缓存)
3. 降级查询:notify_dispatch.py 中 area_id 为空时从 WVP API 查询兜底
4. 配置新增:EdgeAuthConfig(token + enabled)、IotDbConfig
This commit is contained in:
2026-03-23 16:50:07 +08:00
parent 6bc71a9991
commit 7235a20e25
5 changed files with 176 additions and 2 deletions

View File

@@ -98,6 +98,19 @@ class RedisConfig:
enabled: bool = True
@dataclass
class EdgeAuthConfig:
"""Edge 设备认证配置"""
token: str = "" # 共享 TokenEdge 请求时携带 Authorization: Bearer {token}
enabled: bool = False # 是否启用 token 校验
@dataclass
class IotDbConfig:
"""IoT 平台数据库配置(只读查询区域等信息)"""
url: str = "" # 如 mysql+pymysql://user:pass@host/dbname
@dataclass
class CameraNameConfig:
"""摄像头名称格式化配置"""
@@ -138,6 +151,8 @@ class Settings(BaseModel):
work_order: WorkOrderConfig = WorkOrderConfig()
redis: RedisConfig = RedisConfig()
camera_name: CameraNameConfig = CameraNameConfig()
edge_auth: EdgeAuthConfig = EdgeAuthConfig()
iot_db: IotDbConfig = IotDbConfig()
def load_settings() -> Settings:
@@ -213,6 +228,13 @@ def load_settings() -> Settings:
display_format=os.getenv("CAMERA_NAME_FORMAT", "{name}"),
query_timeout=int(os.getenv("CAMERA_QUERY_TIMEOUT", "15")),
),
edge_auth=EdgeAuthConfig(
token=os.getenv("EDGE_AUTH_TOKEN", ""),
enabled=os.getenv("EDGE_AUTH_ENABLED", "false").lower() == "true",
),
iot_db=IotDbConfig(
url=os.getenv("IOT_DATABASE_URL", ""),
),
)

View File

@@ -32,6 +32,7 @@ from app.routers.wechat_callback import router as wechat_callback_router
from app.routers.notify_manage import router as notify_manage_router
from app.routers.wechat_notify_api import router as wechat_notify_router
from app.routers.work_order_api import router as work_order_router
from app.routers.area_api import router as area_router
from app.yudao_compat import yudao_exception_handler
import json
@@ -116,6 +117,7 @@ app.include_router(wechat_callback_router)
app.include_router(wechat_notify_router)
app.include_router(work_order_router)
app.include_router(notify_manage_router)
app.include_router(area_router)
# 注册芋道格式异常处理器
app.add_exception_handler(HTTPException, yudao_exception_handler)

89
app/routers/area_api.py Normal file
View File

@@ -0,0 +1,89 @@
"""
区域列表 API
代理查询 IoT 平台的区域数据,供前端摄像头页面选择区域使用。
带 5 分钟缓存,减少跨服务查询。
"""
import os
import time
from typing import List, Dict, Any
from fastapi import APIRouter
from app.config import settings
from app.yudao_compat import YudaoResponse
from app.utils.logger import logger
router = APIRouter(prefix="/api/area", tags=["区域管理"])
# 区域列表缓存
_area_list_cache: Dict[str, Any] = {"data": [], "expire": 0}
_CACHE_TTL = 300 # 5 分钟
async def _fetch_area_list_from_iot() -> List[Dict]:
"""从 IoT 平台获取区域列表"""
import httpx
base_url = (
os.getenv("IOT_PLATFORM_URL", "")
or settings.work_order.base_url
)
if not base_url:
logger.warning("未配置 IoT 平台地址,无法查询区域列表")
return []
# 复用 notify_dispatch 中的 token 缓存
from app.services.notify_dispatch import _iot_token_cache
now = time.time()
if not _iot_token_cache["token"] or now > _iot_token_cache["expire"]:
async with httpx.AsyncClient(timeout=5) as client:
login_resp = await client.post(
f"{base_url}/admin-api/system/auth/login",
json={"username": "admin", "password": "admin123", "tenantName": "默认"},
headers={"tenant-id": "1"},
)
login_data = login_resp.json().get("data", {})
_iot_token_cache["token"] = login_data.get("accessToken", "")
_iot_token_cache["expire"] = now + 1200
token = _iot_token_cache["token"]
if not token:
return []
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{base_url}/admin-api/ops/area/list",
headers={"tenant-id": "1", "Authorization": f"Bearer {token}"},
)
data = resp.json()
if data.get("code") == 0 and data.get("data"):
return data["data"]
return []
@router.get("/list")
async def get_area_list():
"""
获取区域列表(从 IoT 平台代理查询,带 5 分钟缓存)
返回: [{id, areaName, parentId, ...}]
"""
now = time.time()
if _area_list_cache["data"] and now < _area_list_cache["expire"]:
return YudaoResponse.success(_area_list_cache["data"])
try:
areas = await _fetch_area_list_from_iot()
if areas:
_area_list_cache["data"] = areas
_area_list_cache["expire"] = now + _CACHE_TTL
return YudaoResponse.success(areas)
except Exception as e:
logger.error(f"获取区域列表失败: {e}", exc_info=True)
# 有旧缓存则返回旧数据
if _area_list_cache["data"]:
return YudaoResponse.success(_area_list_cache["data"])
return YudaoResponse.error(500, f"获取区域列表失败: {e}")

View File

@@ -8,7 +8,7 @@ Edge 设备使用 /api/ai/alert/edge/report 和 /api/ai/alert/edge/resolve 路
import asyncio
from datetime import datetime
from fastapi import APIRouter, Depends, Request
from fastapi import APIRouter, Depends, Request, Header, HTTPException
from typing import Optional
from app.yudao_compat import YudaoResponse
@@ -18,14 +18,29 @@ from app.schemas import EdgeAlarmReport, EdgeAlarmResolve
from app.models import EdgeDevice, DeviceStatus, get_session
from app.utils.logger import logger
from app.utils.timezone import beijing_now
from app.config import settings
router = APIRouter(prefix="/api/ai", tags=["Edge-兼容路由"])
def _verify_edge_token(authorization: Optional[str] = Header(None)):
"""验证 Edge 设备 Token仅在 EDGE_AUTH_ENABLED=true 时启用)"""
cfg = settings.edge_auth
if not cfg.enabled or not cfg.token:
return # 未启用认证,放行
if not authorization:
raise HTTPException(status_code=401, detail="缺少 Authorization 头")
# 支持 "Bearer xxx" 格式
token = authorization.removeprefix("Bearer ").strip()
if token != cfg.token:
raise HTTPException(status_code=401, detail="Token 无效")
@router.post("/alert/edge/report")
async def edge_alarm_report(
report: EdgeAlarmReport,
service: AlarmEventService = Depends(get_alarm_event_service),
_auth=Depends(_verify_edge_token),
):
"""
Edge 告警上报(无认证)
@@ -74,6 +89,7 @@ async def edge_alarm_report(
async def edge_alarm_resolve(
resolve: EdgeAlarmResolve,
service: AlarmEventService = Depends(get_alarm_event_service),
_auth=Depends(_verify_edge_token),
):
"""
Edge 告警结束通知(无认证)
@@ -102,7 +118,7 @@ async def edge_alarm_resolve(
@router.post("/device/heartbeat")
async def edge_device_heartbeat(request: Request):
async def edge_device_heartbeat(request: Request, _auth=Depends(_verify_edge_token)):
"""
Edge 设备心跳上报(无认证)

View File

@@ -167,6 +167,9 @@ async def process_alarm_notification(alarm_data: Dict):
wo_client = get_work_order_client()
if wo_client.enabled:
wo_area_id = _get_alarm_area_id(alarm_id) or area_id_int
# 降级:从 WVP API 查询摄像头绑定的 area_id带缓存
if not wo_area_id:
wo_area_id = _get_area_id_from_wvp(device_id)
if wo_area_id:
type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
wo_title = f"{type_name}告警"
@@ -398,6 +401,48 @@ def _get_trigger_source(alarm_id: str) -> str:
db.close()
# WVP 摄像头 area_id 缓存: {camera_code: (area_id, expire_time)}
_wvp_area_cache: dict = {}
_WVP_AREA_CACHE_TTL = 300 # 5 分钟
def _get_area_id_from_wvp(device_id: str) -> int:
"""从 WVP API 查询摄像头的 area_id带 5 分钟缓存)"""
import time
now = time.time()
# 检查缓存
if device_id in _wvp_area_cache:
cached_val, expire = _wvp_area_cache[device_id]
if now < expire:
return cached_val
try:
import requests
wvp_base = settings.camera_name.wvp_api_base.rstrip("/")
resp = requests.get(
f"{wvp_base}/api/ai/camera/list",
timeout=settings.camera_name.query_timeout,
)
if resp.status_code == 200:
data = resp.json()
cameras = data if isinstance(data, list) else data.get("data", [])
for cam in cameras:
cam_code = cam.get("cameraCode") or cam.get("camera_code", "")
cam_area = cam.get("areaId") or cam.get("area_id")
if cam_area:
_wvp_area_cache[cam_code] = (int(cam_area), now + _WVP_AREA_CACHE_TTL)
# 查找目标摄像头
cached = _wvp_area_cache.get(device_id)
if cached:
return cached[0]
# 未找到时缓存 0避免频繁查询
_wvp_area_cache[device_id] = (0, now + _WVP_AREA_CACHE_TTL)
except Exception as e:
logger.warning(f"WVP 查询 area_id 失败: device={device_id}, error={e}")
return 0
def _get_permanent_url(snapshot_url: str) -> str:
"""将 COS object key 转为永久访问 URL"""
if not snapshot_url: