""" 摄像头名称服务 功能: 1. 从 WVP 查询摄像头信息(按 camera_code) 2. 固定优先级提取名称:cameraName → gbName → device_id 3. 批量查询和缓存优化 """ from typing import Optional, Dict, List import asyncio import time import httpx from app.config import CameraNameConfig from app.utils.logger import logger class CameraNameService: """摄像头名称服务(带内存缓存)""" # 缓存 TTL(秒) CACHE_TTL = 300 # 5 分钟 def __init__(self, config: CameraNameConfig): self.config = config self._cache: Dict[str, tuple] = {} # {device_id: (info, expire_time)} async def get_camera_info(self, device_id: str) -> Optional[Dict]: """从 WVP 查询摄像头信息(带缓存)""" if not device_id: return None cached = self._cache.get(device_id) if cached and cached[1] > time.time(): return cached[0] info = await self._query_by_camera_code(device_id) # 写入缓存(包括 None 结果,避免反复查询不存在的设备) self._cache[device_id] = (info, time.time() + self.CACHE_TTL) return info async def _query_by_camera_code(self, camera_code: str) -> Optional[Dict]: """通过 camera_code 查询摄像头信息""" try: async with httpx.AsyncClient(timeout=self.config.query_timeout) as client: resp = await client.get( f"{self.config.wvp_api_base}/api/ai/camera/get", params={"cameraCode": camera_code}, ) if resp.status_code == 200: data = resp.json() if data.get("code") == 0: return data.get("data") else: logger.warning( f"WVP查询摄像头失败: camera_code={camera_code}, " f"status={resp.status_code}" ) except Exception as e: logger.error(f"WVP查询异常: camera_code={camera_code}, error={e}") return None def format_display_name(self, device_id: str, camera_info: Optional[Dict] = None) -> str: """ 格式化摄像头显示名称 固定优先级:cameraName → gbName → device_id """ if not camera_info: return device_id # 1. cameraName(用户自定义名称) name = camera_info.get("cameraName") or camera_info.get("camera_name") if name and name.lower() != "default": return name # 2. gbName(国标名称,去 "/" 后缀) name = camera_info.get("gbName") or camera_info.get("gb_name") if name: if "/" in name: name = name.split("/")[0] if name: return name # 3. 兜底 return device_id async def get_camera_infos_batch(self, device_ids: List[str]) -> Dict[str, Optional[Dict]]: """批量查询摄像头信息(并发+去重优化)""" unique_ids = list(set(did for did in device_ids if did)) if not unique_ids: return {} tasks = [self.get_camera_info(did) for did in unique_ids] results = await asyncio.gather(*tasks) return dict(zip(unique_ids, results)) async def get_display_name(self, device_id: str) -> str: """获取摄像头显示名称(一站式方法)""" camera_info = await self.get_camera_info(device_id) return self.format_display_name(device_id, camera_info) async def get_display_names_batch(self, device_ids: List[str]) -> Dict[str, str]: """批量获取摄像头显示名称""" info_map = await self.get_camera_infos_batch(device_ids) return { did: self.format_display_name(did, info_map.get(did)) for did in device_ids } # 全局单例(依赖注入) _camera_name_service: Optional[CameraNameService] = None def get_camera_name_service() -> CameraNameService: """获取摄像头名称服务单例""" global _camera_name_service if _camera_name_service is None: from app.config import settings _camera_name_service = CameraNameService(settings.camera_name) return _camera_name_service