撤销:移除区域查询相关功能,测试阶段手动写死 area_id
移除 area_api.py、IotDbConfig、WVP 降级查询, 保留 Edge Token 认证功能。区域绑定等 WVP 迁移到 IoT 后再做。
This commit is contained in:
@@ -105,12 +105,6 @@ class EdgeAuthConfig:
|
||||
enabled: bool = False # 是否启用 token 校验
|
||||
|
||||
|
||||
@dataclass
|
||||
class IotDbConfig:
|
||||
"""IoT 平台数据库配置(只读查询区域等信息)"""
|
||||
url: str = "" # 如 mysql+pymysql://user:pass@host/dbname
|
||||
|
||||
|
||||
@dataclass
|
||||
class CameraNameConfig:
|
||||
"""摄像头名称格式化配置"""
|
||||
@@ -152,7 +146,6 @@ class Settings(BaseModel):
|
||||
redis: RedisConfig = RedisConfig()
|
||||
camera_name: CameraNameConfig = CameraNameConfig()
|
||||
edge_auth: EdgeAuthConfig = EdgeAuthConfig()
|
||||
iot_db: IotDbConfig = IotDbConfig()
|
||||
|
||||
|
||||
def load_settings() -> Settings:
|
||||
@@ -232,9 +225,6 @@ def load_settings() -> Settings:
|
||||
token=os.getenv("EDGE_AUTH_TOKEN", ""),
|
||||
enabled=os.getenv("EDGE_AUTH_ENABLED", "false").lower() == "true",
|
||||
),
|
||||
iot_db=IotDbConfig(
|
||||
url=os.getenv("IOT_DATABASE_URL", ""),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -32,7 +32,6 @@ from app.routers.wechat_callback import router as wechat_callback_router
|
||||
from app.routers.notify_manage import router as notify_manage_router
|
||||
from app.routers.wechat_notify_api import router as wechat_notify_router
|
||||
from app.routers.work_order_api import router as work_order_router
|
||||
from app.routers.area_api import router as area_router
|
||||
from app.yudao_compat import yudao_exception_handler
|
||||
import json
|
||||
|
||||
@@ -117,7 +116,6 @@ app.include_router(wechat_callback_router)
|
||||
app.include_router(wechat_notify_router)
|
||||
app.include_router(work_order_router)
|
||||
app.include_router(notify_manage_router)
|
||||
app.include_router(area_router)
|
||||
|
||||
# 注册芋道格式异常处理器
|
||||
app.add_exception_handler(HTTPException, yudao_exception_handler)
|
||||
|
||||
@@ -1,78 +0,0 @@
|
||||
"""
|
||||
区域列表 API
|
||||
|
||||
直接查询 IoT 平台 MySQL 数据库的区域表,供前端摄像头页面选择区域使用。
|
||||
带 5 分钟内存缓存,减少跨库查询。
|
||||
"""
|
||||
|
||||
import time
|
||||
from typing import List, Dict, Any
|
||||
|
||||
from fastapi import APIRouter
|
||||
from sqlalchemy import create_engine, text
|
||||
|
||||
from app.config import settings
|
||||
from app.yudao_compat import YudaoResponse
|
||||
from app.utils.logger import logger
|
||||
|
||||
router = APIRouter(prefix="/api/area", tags=["区域管理"])
|
||||
|
||||
# 区域列表缓存
|
||||
_area_list_cache: Dict[str, Any] = {"data": [], "expire": 0}
|
||||
_CACHE_TTL = 300 # 5 分钟
|
||||
|
||||
# IoT 数据库连接(懒初始化)
|
||||
_iot_engine = None
|
||||
|
||||
|
||||
def _get_iot_engine():
|
||||
global _iot_engine
|
||||
if _iot_engine is None:
|
||||
url = settings.iot_db.url
|
||||
if not url:
|
||||
return None
|
||||
_iot_engine = create_engine(url, pool_size=2, pool_recycle=3600)
|
||||
return _iot_engine
|
||||
|
||||
|
||||
def _fetch_area_list_from_db() -> List[Dict]:
|
||||
"""直接查询 IoT MySQL 区域表"""
|
||||
engine = _get_iot_engine()
|
||||
if not engine:
|
||||
logger.warning("未配置 IOT_DATABASE_URL,无法查询区域列表")
|
||||
return []
|
||||
|
||||
with engine.connect() as conn:
|
||||
# ops_area 是芋道 IoT 平台的区域表
|
||||
result = conn.execute(text(
|
||||
"SELECT id, area_name, parent_id FROM ops_area "
|
||||
"WHERE deleted = 0 ORDER BY sort, id"
|
||||
))
|
||||
return [
|
||||
{"id": row[0], "areaName": row[1], "parentId": row[2]}
|
||||
for row in result
|
||||
]
|
||||
|
||||
|
||||
@router.get("/list")
|
||||
async def get_area_list():
|
||||
"""
|
||||
获取区域列表(直接查 IoT 数据库,带 5 分钟缓存)
|
||||
|
||||
返回: [{id, areaName, parentId}]
|
||||
"""
|
||||
now = time.time()
|
||||
if _area_list_cache["data"] and now < _area_list_cache["expire"]:
|
||||
return YudaoResponse.success(_area_list_cache["data"])
|
||||
|
||||
try:
|
||||
areas = _fetch_area_list_from_db()
|
||||
if areas:
|
||||
_area_list_cache["data"] = areas
|
||||
_area_list_cache["expire"] = now + _CACHE_TTL
|
||||
return YudaoResponse.success(areas)
|
||||
except Exception as e:
|
||||
logger.error(f"获取区域列表失败: {e}", exc_info=True)
|
||||
if _area_list_cache["data"]:
|
||||
return YudaoResponse.success(_area_list_cache["data"])
|
||||
return YudaoResponse.error(500, f"获取区域列表失败: {e}")
|
||||
@@ -167,9 +167,6 @@ async def process_alarm_notification(alarm_data: Dict):
|
||||
wo_client = get_work_order_client()
|
||||
if wo_client.enabled:
|
||||
wo_area_id = _get_alarm_area_id(alarm_id) or area_id_int
|
||||
# 降级:从 WVP API 查询摄像头绑定的 area_id(带缓存)
|
||||
if not wo_area_id:
|
||||
wo_area_id = _get_area_id_from_wvp(device_id)
|
||||
if wo_area_id:
|
||||
type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
|
||||
wo_title = f"{type_name}告警"
|
||||
@@ -401,48 +398,6 @@ def _get_trigger_source(alarm_id: str) -> str:
|
||||
db.close()
|
||||
|
||||
|
||||
# WVP 摄像头 area_id 缓存: {camera_code: (area_id, expire_time)}
|
||||
_wvp_area_cache: dict = {}
|
||||
_WVP_AREA_CACHE_TTL = 300 # 5 分钟
|
||||
|
||||
|
||||
def _get_area_id_from_wvp(device_id: str) -> int:
|
||||
"""从 WVP API 查询摄像头的 area_id(带 5 分钟缓存)"""
|
||||
import time
|
||||
now = time.time()
|
||||
|
||||
# 检查缓存
|
||||
if device_id in _wvp_area_cache:
|
||||
cached_val, expire = _wvp_area_cache[device_id]
|
||||
if now < expire:
|
||||
return cached_val
|
||||
|
||||
try:
|
||||
import requests
|
||||
wvp_base = settings.camera_name.wvp_api_base.rstrip("/")
|
||||
resp = requests.get(
|
||||
f"{wvp_base}/api/ai/camera/list",
|
||||
timeout=settings.camera_name.query_timeout,
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
data = resp.json()
|
||||
cameras = data if isinstance(data, list) else data.get("data", [])
|
||||
for cam in cameras:
|
||||
cam_code = cam.get("cameraCode") or cam.get("camera_code", "")
|
||||
cam_area = cam.get("areaId") or cam.get("area_id")
|
||||
if cam_area:
|
||||
_wvp_area_cache[cam_code] = (int(cam_area), now + _WVP_AREA_CACHE_TTL)
|
||||
# 查找目标摄像头
|
||||
cached = _wvp_area_cache.get(device_id)
|
||||
if cached:
|
||||
return cached[0]
|
||||
# 未找到时缓存 0,避免频繁查询
|
||||
_wvp_area_cache[device_id] = (0, now + _WVP_AREA_CACHE_TTL)
|
||||
except Exception as e:
|
||||
logger.warning(f"WVP 查询 area_id 失败: device={device_id}, error={e}")
|
||||
return 0
|
||||
|
||||
|
||||
def _get_permanent_url(snapshot_url: str) -> str:
|
||||
"""将 COS object key 转为永久访问 URL"""
|
||||
if not snapshot_url:
|
||||
|
||||
Reference in New Issue
Block a user