Files
iot-device-management-service/app/services/camera_name_service.py
16337 0f27caf0b5 修复:摄像头名称显示全链路修复
- config.py: 简化 CameraNameConfig,删除 display_format/name_field_priority
- camera_name_service.py: 重写名称解析,固定优先级 cameraName→gbName→device_id,
  删除废弃的 app/stream 格式解析和 extract_name 方法
- yudao_aiot_alarm.py: 删除 stream→cameraId 的错误映射,cameraId 直接用 device_id
- agent_dispatcher.py: query_camera 删除技术字段返回,list_my_orders 添加摄像头名称解析
2026-03-24 13:38:45 +08:00

126 lines
4.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
摄像头名称服务
功能:
1. 从 WVP 查询摄像头信息(按 camera_code
2. 固定优先级提取名称cameraName → gbName → device_id
3. 批量查询和缓存优化
"""
from typing import Optional, Dict, List
import asyncio
import time
import httpx
from app.config import CameraNameConfig
from app.utils.logger import logger
class CameraNameService:
"""摄像头名称服务(带内存缓存)"""
# 缓存 TTL
CACHE_TTL = 300 # 5 分钟
def __init__(self, config: CameraNameConfig):
self.config = config
self._cache: Dict[str, tuple] = {} # {device_id: (info, expire_time)}
async def get_camera_info(self, device_id: str) -> Optional[Dict]:
"""从 WVP 查询摄像头信息(带缓存)"""
if not device_id:
return None
cached = self._cache.get(device_id)
if cached and cached[1] > time.time():
return cached[0]
info = await self._query_by_camera_code(device_id)
# 写入缓存(包括 None 结果,避免反复查询不存在的设备)
self._cache[device_id] = (info, time.time() + self.CACHE_TTL)
return info
async def _query_by_camera_code(self, camera_code: str) -> Optional[Dict]:
"""通过 camera_code 查询摄像头信息"""
try:
async with httpx.AsyncClient(timeout=self.config.query_timeout) as client:
resp = await client.get(
f"{self.config.wvp_api_base}/api/ai/camera/get",
params={"cameraCode": camera_code},
)
if resp.status_code == 200:
data = resp.json()
if data.get("code") == 0:
return data.get("data")
else:
logger.warning(
f"WVP查询摄像头失败: camera_code={camera_code}, "
f"status={resp.status_code}"
)
except Exception as e:
logger.error(f"WVP查询异常: camera_code={camera_code}, error={e}")
return None
def format_display_name(self, device_id: str, camera_info: Optional[Dict] = None) -> str:
"""
格式化摄像头显示名称
固定优先级cameraName → gbName → device_id
"""
if not camera_info:
return device_id
# 1. cameraName用户自定义名称
name = camera_info.get("cameraName") or camera_info.get("camera_name")
if name and name.lower() != "default":
return name
# 2. gbName国标名称去 "/" 后缀)
name = camera_info.get("gbName") or camera_info.get("gb_name")
if name:
if "/" in name:
name = name.split("/")[0]
if name:
return name
# 3. 兜底
return device_id
async def get_camera_infos_batch(self, device_ids: List[str]) -> Dict[str, Optional[Dict]]:
"""批量查询摄像头信息(并发+去重优化)"""
unique_ids = list(set(did for did in device_ids if did))
if not unique_ids:
return {}
tasks = [self.get_camera_info(did) for did in unique_ids]
results = await asyncio.gather(*tasks)
return dict(zip(unique_ids, results))
async def get_display_name(self, device_id: str) -> str:
"""获取摄像头显示名称(一站式方法)"""
camera_info = await self.get_camera_info(device_id)
return self.format_display_name(device_id, camera_info)
async def get_display_names_batch(self, device_ids: List[str]) -> Dict[str, str]:
"""批量获取摄像头显示名称"""
info_map = await self.get_camera_infos_batch(device_ids)
return {
did: self.format_display_name(did, info_map.get(did))
for did in device_ids
}
# 全局单例(依赖注入)
_camera_name_service: Optional[CameraNameService] = None
def get_camera_name_service() -> CameraNameService:
"""获取摄像头名称服务单例"""
global _camera_name_service
if _camera_name_service is None:
from app.config import settings
_camera_name_service = CameraNameService(settings.camera_name)
return _camera_name_service