Files
iot-device-management-service/app/routers/area_api.py
16337 5b83e38f25 重构:区域列表改为直接查 IoT 数据库
去掉 httpx 调 IoT HTTP 接口的方式,改为通过 IOT_DATABASE_URL
直接查询 IoT MySQL 的 ops_area 表,更简单可靠。
2026-03-23 16:53:45 +08:00

79 lines
2.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
区域列表 API
直接查询 IoT 平台 MySQL 数据库的区域表,供前端摄像头页面选择区域使用。
带 5 分钟内存缓存,减少跨库查询。
"""
import time
from typing import List, Dict, Any
from fastapi import APIRouter
from sqlalchemy import create_engine, text
from app.config import settings
from app.yudao_compat import YudaoResponse
from app.utils.logger import logger
router = APIRouter(prefix="/api/area", tags=["区域管理"])
# 区域列表缓存
_area_list_cache: Dict[str, Any] = {"data": [], "expire": 0}
_CACHE_TTL = 300 # 5 分钟
# IoT 数据库连接(懒初始化)
_iot_engine = None
def _get_iot_engine():
global _iot_engine
if _iot_engine is None:
url = settings.iot_db.url
if not url:
return None
_iot_engine = create_engine(url, pool_size=2, pool_recycle=3600)
return _iot_engine
def _fetch_area_list_from_db() -> List[Dict]:
"""直接查询 IoT MySQL 区域表"""
engine = _get_iot_engine()
if not engine:
logger.warning("未配置 IOT_DATABASE_URL无法查询区域列表")
return []
with engine.connect() as conn:
# ops_area 是芋道 IoT 平台的区域表
result = conn.execute(text(
"SELECT id, area_name, parent_id FROM ops_area "
"WHERE deleted = 0 ORDER BY sort, id"
))
return [
{"id": row[0], "areaName": row[1], "parentId": row[2]}
for row in result
]
@router.get("/list")
async def get_area_list():
"""
获取区域列表(直接查 IoT 数据库,带 5 分钟缓存)
返回: [{id, areaName, parentId}]
"""
now = time.time()
if _area_list_cache["data"] and now < _area_list_cache["expire"]:
return YudaoResponse.success(_area_list_cache["data"])
try:
areas = _fetch_area_list_from_db()
if areas:
_area_list_cache["data"] = areas
_area_list_cache["expire"] = now + _CACHE_TTL
return YudaoResponse.success(areas)
except Exception as e:
logger.error(f"获取区域列表失败: {e}", exc_info=True)
if _area_list_cache["data"]:
return YudaoResponse.success(_area_list_cache["data"])
return YudaoResponse.error(500, f"获取区域列表失败: {e}")