diff --git a/app/config.py b/app/config.py index 323f248..eda3f9b 100644 --- a/app/config.py +++ b/app/config.py @@ -105,12 +105,6 @@ class EdgeAuthConfig: enabled: bool = False # 是否启用 token 校验 -@dataclass -class IotDbConfig: - """IoT 平台数据库配置(只读查询区域等信息)""" - url: str = "" # 如 mysql+pymysql://user:pass@host/dbname - - @dataclass class CameraNameConfig: """摄像头名称格式化配置""" @@ -152,7 +146,6 @@ class Settings(BaseModel): redis: RedisConfig = RedisConfig() camera_name: CameraNameConfig = CameraNameConfig() edge_auth: EdgeAuthConfig = EdgeAuthConfig() - iot_db: IotDbConfig = IotDbConfig() def load_settings() -> Settings: @@ -232,9 +225,6 @@ def load_settings() -> Settings: token=os.getenv("EDGE_AUTH_TOKEN", ""), enabled=os.getenv("EDGE_AUTH_ENABLED", "false").lower() == "true", ), - iot_db=IotDbConfig( - url=os.getenv("IOT_DATABASE_URL", ""), - ), ) diff --git a/app/main.py b/app/main.py index 2a0cd8b..5d7ba42 100644 --- a/app/main.py +++ b/app/main.py @@ -32,7 +32,6 @@ from app.routers.wechat_callback import router as wechat_callback_router from app.routers.notify_manage import router as notify_manage_router from app.routers.wechat_notify_api import router as wechat_notify_router from app.routers.work_order_api import router as work_order_router -from app.routers.area_api import router as area_router from app.yudao_compat import yudao_exception_handler import json @@ -117,7 +116,6 @@ app.include_router(wechat_callback_router) app.include_router(wechat_notify_router) app.include_router(work_order_router) app.include_router(notify_manage_router) -app.include_router(area_router) # 注册芋道格式异常处理器 app.add_exception_handler(HTTPException, yudao_exception_handler) diff --git a/app/routers/area_api.py b/app/routers/area_api.py deleted file mode 100644 index 6133907..0000000 --- a/app/routers/area_api.py +++ /dev/null @@ -1,78 +0,0 @@ -""" -区域列表 API - -直接查询 IoT 平台 MySQL 数据库的区域表,供前端摄像头页面选择区域使用。 -带 5 分钟内存缓存,减少跨库查询。 -""" - -import time -from typing import List, Dict, Any - -from fastapi import APIRouter -from sqlalchemy import create_engine, text - -from app.config import settings -from app.yudao_compat import YudaoResponse -from app.utils.logger import logger - -router = APIRouter(prefix="/api/area", tags=["区域管理"]) - -# 区域列表缓存 -_area_list_cache: Dict[str, Any] = {"data": [], "expire": 0} -_CACHE_TTL = 300 # 5 分钟 - -# IoT 数据库连接(懒初始化) -_iot_engine = None - - -def _get_iot_engine(): - global _iot_engine - if _iot_engine is None: - url = settings.iot_db.url - if not url: - return None - _iot_engine = create_engine(url, pool_size=2, pool_recycle=3600) - return _iot_engine - - -def _fetch_area_list_from_db() -> List[Dict]: - """直接查询 IoT MySQL 区域表""" - engine = _get_iot_engine() - if not engine: - logger.warning("未配置 IOT_DATABASE_URL,无法查询区域列表") - return [] - - with engine.connect() as conn: - # ops_area 是芋道 IoT 平台的区域表 - result = conn.execute(text( - "SELECT id, area_name, parent_id FROM ops_area " - "WHERE deleted = 0 ORDER BY sort, id" - )) - return [ - {"id": row[0], "areaName": row[1], "parentId": row[2]} - for row in result - ] - - -@router.get("/list") -async def get_area_list(): - """ - 获取区域列表(直接查 IoT 数据库,带 5 分钟缓存) - - 返回: [{id, areaName, parentId}] - """ - now = time.time() - if _area_list_cache["data"] and now < _area_list_cache["expire"]: - return YudaoResponse.success(_area_list_cache["data"]) - - try: - areas = _fetch_area_list_from_db() - if areas: - _area_list_cache["data"] = areas - _area_list_cache["expire"] = now + _CACHE_TTL - return YudaoResponse.success(areas) - except Exception as e: - logger.error(f"获取区域列表失败: {e}", exc_info=True) - if _area_list_cache["data"]: - return YudaoResponse.success(_area_list_cache["data"]) - return YudaoResponse.error(500, f"获取区域列表失败: {e}") diff --git a/app/services/notify_dispatch.py b/app/services/notify_dispatch.py index f7cf983..a1c6ba1 100644 --- a/app/services/notify_dispatch.py +++ b/app/services/notify_dispatch.py @@ -167,9 +167,6 @@ async def process_alarm_notification(alarm_data: Dict): wo_client = get_work_order_client() if wo_client.enabled: wo_area_id = _get_alarm_area_id(alarm_id) or area_id_int - # 降级:从 WVP API 查询摄像头绑定的 area_id(带缓存) - if not wo_area_id: - wo_area_id = _get_area_id_from_wvp(device_id) if wo_area_id: type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type) wo_title = f"{type_name}告警" @@ -401,48 +398,6 @@ def _get_trigger_source(alarm_id: str) -> str: db.close() -# WVP 摄像头 area_id 缓存: {camera_code: (area_id, expire_time)} -_wvp_area_cache: dict = {} -_WVP_AREA_CACHE_TTL = 300 # 5 分钟 - - -def _get_area_id_from_wvp(device_id: str) -> int: - """从 WVP API 查询摄像头的 area_id(带 5 分钟缓存)""" - import time - now = time.time() - - # 检查缓存 - if device_id in _wvp_area_cache: - cached_val, expire = _wvp_area_cache[device_id] - if now < expire: - return cached_val - - try: - import requests - wvp_base = settings.camera_name.wvp_api_base.rstrip("/") - resp = requests.get( - f"{wvp_base}/api/ai/camera/list", - timeout=settings.camera_name.query_timeout, - ) - if resp.status_code == 200: - data = resp.json() - cameras = data if isinstance(data, list) else data.get("data", []) - for cam in cameras: - cam_code = cam.get("cameraCode") or cam.get("camera_code", "") - cam_area = cam.get("areaId") or cam.get("area_id") - if cam_area: - _wvp_area_cache[cam_code] = (int(cam_area), now + _WVP_AREA_CACHE_TTL) - # 查找目标摄像头 - cached = _wvp_area_cache.get(device_id) - if cached: - return cached[0] - # 未找到时缓存 0,避免频繁查询 - _wvp_area_cache[device_id] = (0, now + _WVP_AREA_CACHE_TTL) - except Exception as e: - logger.warning(f"WVP 查询 area_id 失败: device={device_id}, error={e}") - return 0 - - def _get_permanent_url(snapshot_url: str) -> str: """将 COS object key 转为永久访问 URL""" if not snapshot_url: