修复:摄像头名称显示全链路修复

- config.py: 简化 CameraNameConfig,删除 display_format/name_field_priority
- camera_name_service.py: 重写名称解析,固定优先级 cameraName→gbName→device_id,
  删除废弃的 app/stream 格式解析和 extract_name 方法
- yudao_aiot_alarm.py: 删除 stream→cameraId 的错误映射,cameraId 直接用 device_id
- agent_dispatcher.py: query_camera 删除技术字段返回,list_my_orders 添加摄像头名称解析
This commit is contained in:
2026-03-24 13:38:45 +08:00
parent fbdd340a8e
commit 0f27caf0b5
4 changed files with 49 additions and 264 deletions

View File

@@ -109,31 +109,10 @@ class EdgeAuthConfig:
@dataclass
class CameraNameConfig:
"""摄像头名称格式化配置"""
# WVP API基础URL
"""摄像头名称配置"""
wvp_api_base: str = "http://localhost:18080"
# 显示格式模板(支持变量:{camera_code}, {name}, {stream}
# 可选格式:
# - "{name}" - 仅名称(推荐,告警列表使用)
# - "{camera_code} {name}/{stream}" - cam_xxx 名称/流id完整格式
# - "{name}/{stream}" - 名称/流id
# - "{camera_code}" - 仅code
display_format: str = "{name}"
# 名称字段优先级(从高到低)
# 从StreamProxy对象中提取名称时的字段优先级
# 注意gb_name 可能包含 "/" 后缀,会自动去除
name_field_priority: list = None
# 查询超时(秒)
query_timeout: int = 15
def __post_init__(self):
if self.name_field_priority is None:
# 默认优先级gb_name > app > stream
self.name_field_priority = ["gbName", "app", "stream"]
class Settings(BaseModel):
"""全局配置"""
@@ -222,7 +201,6 @@ def load_settings() -> Settings:
),
camera_name=CameraNameConfig(
wvp_api_base=os.getenv("WVP_API_BASE", "http://localhost:18080"),
display_format=os.getenv("CAMERA_NAME_FORMAT", "{name}"),
query_timeout=int(os.getenv("CAMERA_QUERY_TIMEOUT", "15")),
),
edge_auth=EdgeAuthConfig(

View File

@@ -79,18 +79,6 @@ async def _alarm_to_camel(alarm_dict: dict, camera_info_map: dict = None, camera
if camera_service and camera_info:
device_name = camera_service.format_display_name(device_id, camera_info)
# 提取摄像头ID统一使用stream作为编号
camera_id = device_id # 默认值
if camera_info:
stream = camera_info.get("stream")
if stream:
camera_id = stream
elif device_id and "/" in device_id:
# app/stream 格式,直接解析
parts = device_id.split("/", 1)
if len(parts) == 2:
camera_id = parts[1]
return {
# 新字段(三表结构)
"alarmId": alarm_id,
@@ -122,7 +110,7 @@ async def _alarm_to_camel(alarm_dict: dict, camera_info_map: dict = None, camera
"id": alarm_id,
"alertNo": alarm_id,
"deviceId": device_id, # 原始ID用于查询
"cameraId": camera_id, # 摄像头IDstream编号
"cameraId": device_id, # 摄像头ID
"cameraName": device_name, # 摄像头名称
"alertType": alarm_dict.get("alarm_type"),
"alertTypeName": _get_alarm_type_name(alarm_dict.get("alarm_type")),
@@ -403,20 +391,8 @@ async def get_device_summary_page(
# 提取摄像头名称
device_name = camera_service.format_display_name(device_id, camera_info)
# 提取摄像头ID统一使用stream作为编号
camera_id = device_id # 默认值
if camera_info:
stream = camera_info.get("stream")
if stream:
camera_id = stream
elif "/" in device_id:
# app/stream 格式,直接解析
parts = device_id.split("/", 1)
if len(parts) == 2:
camera_id = parts[1]
item["deviceId"] = device_id # 原始ID用于查询
item["cameraId"] = camera_id # 摄像头IDstream编号
item["cameraId"] = device_id # 摄像头ID
item["cameraName"] = device_name # 摄像头名称
item["deviceName"] = device_name # 摄像头名称(兼容)
item["pendingCount"] = item.get("unhandledCount")

View File

@@ -630,7 +630,9 @@ class AgentDispatcher:
async def _tool_list_my_orders(self, args: dict, user_id: str) -> dict:
"""查询我的待处理工单"""
from app.services.alarm_event_service import get_alarm_event_service
from app.services.camera_name_service import get_camera_name_service
svc = get_alarm_event_service()
camera_service = get_camera_name_service()
# 查询 handler=user_id & handle_status=HANDLING 的告警
alarms, total = svc.get_alarms(
@@ -647,6 +649,12 @@ class AgentDispatcher:
items = []
for a in my_alarms:
cam_name = a.device_id
try:
cam_info = await camera_service.get_camera_info(a.device_id)
cam_name = camera_service.format_display_name(a.device_id, cam_info)
except Exception:
pass
event_time = ""
if a.event_time:
try:
@@ -656,7 +664,7 @@ class AgentDispatcher:
items.append({
"alarm_id": a.alarm_id,
"type": ALARM_TYPE_NAMES.get(a.alarm_type, a.alarm_type),
"device_id": a.device_id,
"camera": cam_name,
"time": event_time,
})
@@ -729,9 +737,6 @@ class AgentDispatcher:
return {
"camera_id": camera_id,
"name": camera_service.format_display_name(camera_id, cam_info),
"info": {k: v for k, v in cam_info.items() if k in (
"app", "stream", "gbName", "gbId", "mediaServerId", "originType",
)},
}
return {"error": f"未找到摄像头: {camera_id}"}
except Exception as e:

View File

@@ -1,21 +1,14 @@
"""
摄像头名称格式化服务
摄像头名称服务
功能:
1. 从 WVP 查询摄像头信息
2. 根据配置提取摄像头名称
3. 按配置模板格式化显示名称
4. 批量查询和缓存优化
设计原则:
- 配置驱动:所有格式和字段映射通过配置文件控制
- 单一职责:只负责摄像头名称的查询和格式化
- 可扩展:新增格式只需修改配置,不需改代码
- 可测试:不依赖全局状态,便于单元测试
- 性能优化:批量查询、去重、请求内缓存
1. 从 WVP 查询摄像头信息(按 camera_code
2. 固定优先级提取名称cameraName → gbName → device_id
3. 批量查询和缓存优化
"""
from typing import Optional, Dict, List
import asyncio
import time
import httpx
from app.config import CameraNameConfig
@@ -34,56 +27,21 @@ class CameraNameService:
async def get_camera_info(self, device_id: str) -> Optional[Dict]:
"""从 WVP 查询摄像头信息(带缓存)"""
# 检查缓存
if not device_id:
return None
cached = self._cache.get(device_id)
if cached and cached[1] > time.time():
return cached[0]
info = None
# camera_code 格式cam 前缀cam_xxx 或 camXXX 均支持)
if device_id.lower().startswith("cam"):
info = await self._query_by_camera_code(device_id)
# app/stream 格式(遗留格式,直接解析)
elif "/" in device_id:
info = self._parse_app_stream_format(device_id)
info = await self._query_by_camera_code(device_id)
# 写入缓存(包括 None 结果,避免反复查询不存在的设备)
self._cache[device_id] = (info, time.time() + self.CACHE_TTL)
return info
def _parse_app_stream_format(self, device_id: str) -> Optional[Dict]:
"""
解析 app/stream 格式的 device_id
Args:
device_id: 格式如 "大堂吧台3/012"
Returns:
虚拟的摄像头信息字典
"""
parts = device_id.split("/", 1)
if len(parts) != 2:
return None
app, stream = parts
# 构造虚拟的摄像头信息(兼容统一格式化逻辑)
return {
"app": app, # 中文名称
"stream": stream, # 流ID
"gbName": app, # 兼容字段
"cameraCode": None, # app/stream 格式没有 camera_code
}
async def _query_by_camera_code(self, camera_code: str) -> Optional[Dict]:
"""
通过 camera_code 查询摄像头信息
Args:
camera_code: 摄像头编码格式cam_xxxxxxxxxxxx
Returns:
摄像头信息字典,查询失败返回 None
"""
"""通过 camera_code 查询摄像头信息"""
try:
async with httpx.AsyncClient(timeout=self.config.query_timeout) as client:
resp = await client.get(
@@ -105,176 +63,49 @@ class CameraNameService:
return None
async def get_camera_infos_batch(self, device_ids: List[str]) -> Dict[str, Optional[Dict]]:
"""
批量查询摄像头信息(并发+去重优化)
Args:
device_ids: 设备ID列表
Returns:
{device_id: camera_info} 字典
"""
# 去重
unique_ids = list(set(device_ids))
# 分类camera_code 和 app/stream 格式
camera_code_ids = [did for did in unique_ids if did.lower().startswith("cam")]
app_stream_ids = [did for did in unique_ids if "/" in did]
# 初始化结果映射
info_map = {}
# 并发查询 camera_code 格式
if camera_code_ids:
import asyncio
tasks = [self.get_camera_info(did) for did in camera_code_ids]
results = await asyncio.gather(*tasks)
info_map.update(dict(zip(camera_code_ids, results)))
# 直接解析 app/stream 格式(无需查询)
for did in app_stream_ids:
info_map[did] = self._parse_app_stream_format(did)
# 补充其他格式(未识别的)
for did in unique_ids:
if did not in info_map:
info_map[did] = None
return info_map
def extract_name(self, camera_info: Dict) -> Optional[str]:
"""
从摄像头信息中提取名称
根据配置的字段优先级提取名称:
1. 按优先级遍历字段
2. 如果是 gbName 字段,自动去除 "/" 后缀
3. 返回第一个非空值
Args:
camera_info: 摄像头信息字典
Returns:
提取的名称,失败返回 None
"""
for field in self.config.name_field_priority:
value = camera_info.get(field)
if value:
# gb_name 可能包含 "/" 后缀,需要去除
if field == "gbName" and "/" in value:
value = value.split("/")[0]
return value
return None
def format_display_name(
self,
device_id: str,
camera_info: Optional[Dict] = None
) -> str:
def format_display_name(self, device_id: str, camera_info: Optional[Dict] = None) -> str:
"""
格式化摄像头显示名称
Args:
device_id: 设备IDfallback值
camera_info: 摄像头信息字典(可选)
Returns:
格式化后的显示名称
示例:
配置模板:"{camera_code} {name}/{stream}"
camera_info: {cameraCode: "cam_123", gbName: "大堂/", stream: "012"}
返回: "cam_123 大堂/012"
配置模板:"{name}"
返回: "大堂"
app/stream格式
device_id: "大堂吧台3/012"
camera_info: {app: "大堂吧台3", stream: "012", cameraCode: None}
返回: "大堂吧台3"(忽略模板,直接返回名称)
固定优先级cameraName → gbName → device_id
"""
# 如果没有摄像头信息直接返回device_id
if not camera_info:
return device_id
# 优先使用 camera_name用户自定义名称,跳过无效值
camera_name = camera_info.get("cameraName") or camera_info.get("camera_name")
if camera_name and camera_name.lower() != "default":
return camera_name
# 降级:提取变量
camera_code = camera_info.get("cameraCode") or camera_info.get("camera_code")
stream = camera_info.get("stream")
name = self.extract_name(camera_info)
# 如果没有提取到名称返回device_id
if not name:
logger.warning(f"无法提取摄像头名称: device_id={device_id}")
return device_id
# 对于 app/stream 格式(没有 camera_code直接返回名称
if not camera_code:
# 1. cameraName用户自定义名称
name = camera_info.get("cameraName") or camera_info.get("camera_name")
if name and name.lower() != "default":
return name
# 对于 camera_code 格式,检查模板需要的变量
# 如果模板只需要 {name},直接返回名称
if self.config.display_format == "{name}":
return name
# 2. gbName国标名称去 "/" 后缀)
name = camera_info.get("gbName") or camera_info.get("gb_name")
if name:
if "/" in name:
name = name.split("/")[0]
if name:
return name
# 完整格式需要所有字段
if not stream:
logger.warning(
f"摄像头信息不完整: camera_code={camera_code}, "
f"name={name}, stream={stream}, 使用fallback"
)
return name # 至少返回名称
# 3. 兜底
return device_id
# 按模板格式化
try:
return self.config.display_format.format(
camera_code=camera_code,
name=name,
stream=stream
)
except KeyError as e:
logger.error(f"格式化模板变量错误: {e}, 模板={self.config.display_format}")
return name # 出错时至少返回名称
async def get_camera_infos_batch(self, device_ids: List[str]) -> Dict[str, Optional[Dict]]:
"""批量查询摄像头信息(并发+去重优化)"""
unique_ids = list(set(did for did in device_ids if did))
if not unique_ids:
return {}
tasks = [self.get_camera_info(did) for did in unique_ids]
results = await asyncio.gather(*tasks)
return dict(zip(unique_ids, results))
async def get_display_name(self, device_id: str) -> str:
"""
获取摄像头显示名称(一站式方法)
结合查询和格式化,返回最终显示名称
Args:
device_id: 设备ID
Returns:
格式化后的显示名称
"""
"""获取摄像头显示名称(一站式方法)"""
camera_info = await self.get_camera_info(device_id)
return self.format_display_name(device_id, camera_info)
async def get_display_names_batch(
self,
device_ids: List[str]
) -> Dict[str, str]:
"""
批量获取摄像头显示名称(性能优化版本)
Args:
device_ids: 设备ID列表
Returns:
{device_id: display_name} 字典
"""
# 批量查询摄像头信息(去重+并发)
async def get_display_names_batch(self, device_ids: List[str]) -> Dict[str, str]:
"""批量获取摄像头显示名称"""
info_map = await self.get_camera_infos_batch(device_ids)
# 格式化所有名称
return {
did: self.format_display_name(did, info_map.get(did))
for did in device_ids
@@ -286,12 +117,7 @@ _camera_name_service: Optional[CameraNameService] = None
def get_camera_name_service() -> CameraNameService:
"""
获取摄像头名称服务单例
Returns:
CameraNameService 实例
"""
"""获取摄像头名称服务单例"""
global _camera_name_service
if _camera_name_service is None:
from app.config import settings