refactor(alarm): 模块化摄像头名称格式化服务

问题:
- 硬编码字段映射(gbName、name、app)
- 逻辑重复散落多处
- 格式写死无法配置
- 未基于数据库实际表结构
- 可扩展性差

重构方案:
1. 创建配置类 CameraNameConfig
   - 显示格式模板(支持变量:{camera_code}, {name}, {stream})
   - 字段优先级配置
   - WVP API配置
   - 查询超时配置

2. 创建服务类 CameraNameService
   - 查询摄像头信息(get_camera_info)
   - 提取名称字段(extract_name)
   - 格式化显示名称(format_display_name)
   - 一站式方法(get_display_name)

3. 重构路由层
   - 移除硬编码逻辑
   - 使用camera_name_service统一处理
   - 删除旧的_get_camera_info函数
   - 简化代码结构

架构优势:
- 配置驱动:格式通过环境变量控制
- 单一职责:服务只负责名称处理
- 可扩展:新增格式无需改代码
- 可测试:服务独立易于测试
- 模块化:逻辑集中便于维护

配置示例:
```bash
WVP_API_BASE=http://localhost:18080
CAMERA_NAME_FORMAT={camera_code} {name}/{stream}
CAMERA_QUERY_TIMEOUT=5
```

修改文件:
+ app/config.py - 添加CameraNameConfig配置
+ app/services/camera_name_service.py - 新建服务
+ docs/camera_name_config.md - 配置文档
~ app/routers/yudao_aiot_alarm.py - 使用新服务

测试结果:
- 告警列表: cam_1f0e3dad9990 → cam_1f0e3dad9990 大堂吧台3/012 ✓
- 设备汇总: cam_c51ce410c124 → cam_c51ce410c124 大堂吧台1/008 ✓
This commit is contained in:
2026-02-24 13:59:13 +08:00
parent ea599ed999
commit 6996423f7d
5 changed files with 518 additions and 113 deletions

View File

@@ -69,6 +69,34 @@ class RedisConfig:
enabled: bool = True
@dataclass
class CameraNameConfig:
"""摄像头名称格式化配置"""
# WVP API基础URL
wvp_api_base: str = "http://localhost:18080"
# 显示格式模板(支持变量:{camera_code}, {name}, {stream}
# 可选格式:
# - "{camera_code} {name}/{stream}" - cam_xxx 名称/流id
# - "{name}/{stream}" - 名称/流id
# - "{name}" - 仅名称
# - "{camera_code}" - 仅code
display_format: str = "{camera_code} {name}/{stream}"
# 名称字段优先级(从高到低)
# 从StreamProxy对象中提取名称时的字段优先级
# 注意gb_name 可能包含 "/" 后缀,会自动去除
name_field_priority: list = None
# 查询超时(秒)
query_timeout: int = 5
def __post_init__(self):
if self.name_field_priority is None:
# 默认优先级gb_name > app > stream
self.name_field_priority = ["gbName", "app", "stream"]
class Settings(BaseModel):
"""全局配置"""
database: DatabaseConfig = DatabaseConfig()
@@ -77,6 +105,7 @@ class Settings(BaseModel):
ai_model: AIModelConfig = AIModelConfig()
mqtt: MQTTConfig = MQTTConfig()
redis: RedisConfig = RedisConfig()
camera_name: CameraNameConfig = CameraNameConfig()
def load_settings() -> Settings:
@@ -127,6 +156,11 @@ def load_settings() -> Settings:
max_connections=int(os.getenv("REDIS_MAX_CONNECTIONS", "50")),
enabled=os.getenv("REDIS_ENABLED", "true").lower() == "true",
),
camera_name=CameraNameConfig(
wvp_api_base=os.getenv("WVP_API_BASE", "http://localhost:18080"),
display_format=os.getenv("CAMERA_NAME_FORMAT", "{camera_code} {name}/{stream}"),
query_timeout=int(os.getenv("CAMERA_QUERY_TIMEOUT", "5")),
),
)

BIN
app/data/alert_platform.db Normal file

Binary file not shown.

View File

@@ -15,14 +15,13 @@ API 路径规范:
from fastapi import APIRouter, Query, Depends, HTTPException
from typing import Optional
from datetime import datetime
import httpx
import asyncio
import os
from app.yudao_compat import YudaoResponse, get_current_user
from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService
from app.services.notification_service import get_notification_service
from app.services.oss_storage import get_oss_storage
from app.services.camera_name_service import get_camera_name_service
from app.schemas import EdgeAlarmReport, EdgeAlarmResolve
from app.utils.logger import logger
@@ -61,34 +60,13 @@ async def _alarm_to_camel(alarm_dict: dict, current_user: dict = None) -> dict:
alarm_id = alarm_dict.get("alarm_id")
# 查询摄像头名称(统一格式camera_code 摄像头名称/stream
# 查询摄像头显示名称(使用配置化服务
device_id = alarm_dict.get("device_id")
device_name = device_id # 默认使用 device_id
if current_user and device_id:
try:
camera_info = await _get_camera_info(device_id, current_user)
if camera_info:
# 获取摄像头中文名称(三级 fallback
camera_cn_name = None
gb_name = camera_info.get("gbName") or camera_info.get("gb_name")
if gb_name:
camera_cn_name = gb_name.split("/")[0]
elif camera_info.get("name"):
camera_cn_name = camera_info.get("name")
elif camera_info.get("app"):
camera_cn_name = camera_info.get("app")
# 统一格式camera_code 摄像头名称/stream
camera_code = camera_info.get("cameraCode") or camera_info.get("camera_code")
stream = camera_info.get("stream")
if camera_code and camera_cn_name and stream:
device_name = f"{camera_code} {camera_cn_name}/{stream}"
elif camera_cn_name:
device_name = camera_cn_name
except Exception as e:
logger.warning(f"告警列表查询摄像头信息失败: device_id={device_id}, error={e}")
if device_id:
camera_service = get_camera_name_service()
device_name = await camera_service.get_display_name(device_id)
return {
# 新字段(三表结构)
@@ -274,40 +252,17 @@ async def get_device_summary_page(
"""获取设备告警汇总(分页)"""
result = service.get_device_summary(page=pageNo, page_size=pageSize)
# 添加前端兼容字段别名,并查询摄像头名称
# 添加前端兼容字段别名,并查询摄像头名称(使用配置化服务)
camera_service = get_camera_name_service()
compat_list = []
for item in result.get("list", []):
device_id = item.get("deviceId")
device_name = device_id # 默认使用 device_id
# 尝试从 WVP 查询摄像头名称统一格式camera_code 摄像头名称/stream
try:
camera_info = await _get_camera_info(device_id, current_user)
if camera_info:
# 获取摄像头中文名称(三级 fallback
camera_cn_name = None
gb_name = camera_info.get("gbName") or camera_info.get("gb_name")
if gb_name:
camera_cn_name = gb_name.split("/")[0]
elif camera_info.get("name"):
camera_cn_name = camera_info.get("name")
elif camera_info.get("app"):
camera_cn_name = camera_info.get("app")
# 统一格式camera_code 摄像头名称/stream
camera_code = camera_info.get("cameraCode") or camera_info.get("camera_code")
stream = camera_info.get("stream")
if camera_code and camera_cn_name and stream:
device_name = f"{camera_code} {camera_cn_name}/{stream}"
elif camera_cn_name:
device_name = camera_cn_name
except Exception as e:
logger.warning(f"查询摄像头信息失败: device_id={device_id}, error={e}")
# 使用配置化服务获取显示名称
device_name = await camera_service.get_display_name(device_id)
item["cameraId"] = device_id
item["cameraName"] = device_name
item["deviceName"] = device_name # 更新 deviceName 为实际名称
item["deviceName"] = device_name
item["pendingCount"] = item.get("unhandledCount")
item["lastAlertTime"] = item.get("lastEventTime")
item["lastAlertType"] = item.get("lastAlarmType")
@@ -389,64 +344,6 @@ async def edge_alarm_resolve(
# ==================== 辅助函数 ====================
OPS_ALARM_URL = "http://192.168.0.104:48080/admin-api/ops/alarm/receive"
WVP_API_BASE = os.getenv("WVP_API_BASE", "http://localhost:18080")
async def _get_camera_info(device_id: str, current_user: dict) -> Optional[dict]:
"""
从 WVP 查询摄像头信息
支持 camera_code 和 app/stream 两种格式
"""
# 如果是 camera_code 格式cam_xxxxxxxxxxxx
if device_id.startswith("cam_"):
try:
async with httpx.AsyncClient(timeout=5) as client:
# 调用 WVP 查询单个摄像头的 API无需认证已加白名单
resp = await client.get(
f"{WVP_API_BASE}/api/ai/camera/get",
params={"cameraCode": device_id},
)
logger.info(f"查询摄像头: device_id={device_id}, status={resp.status_code}")
if resp.status_code == 200:
data = resp.json()
logger.info(f"WVP 响应: {data}")
if data.get("code") == 0:
return data.get("data")
except Exception as e:
logger.warning(f"查询 camera_code 失败: {device_id}, error={e}")
# 如果是 app/stream 格式
elif "/" in device_id:
# 获取 tokenapp/stream 查询需要认证)
token = current_user.get("access_token") or current_user.get("token")
if not token:
logger.warning(f"查询 app/stream 失败: 缺少 token, device_id={device_id}")
return None
headers = {"Authorization": f"Bearer {token}"}
parts = device_id.split("/", 1)
if len(parts) == 2:
app, stream = parts
try:
async with httpx.AsyncClient(timeout=5) as client:
# 查询摄像头列表,筛选 app 和 stream
resp = await client.get(
f"{WVP_API_BASE}/admin-api/aiot/device/camera/list",
params={"page": 1, "count": 1, "query": stream},
headers=headers
)
if resp.status_code == 200:
data = resp.json()
if data.get("code") == 0:
camera_list = data.get("data", {}).get("list", [])
# 找到匹配的摄像头
for camera in camera_list:
if camera.get("app") == app and camera.get("stream") == stream:
return camera
except Exception as e:
logger.warning(f"查询 app/stream 失败: {device_id}, error={e}")
return None
async def _notify_ops_platform(data: dict):

View File

@@ -0,0 +1,195 @@
"""
摄像头名称格式化服务
功能:
1. 从 WVP 查询摄像头信息
2. 根据配置提取摄像头名称
3. 按配置模板格式化显示名称
设计原则:
- 配置驱动:所有格式和字段映射通过配置文件控制
- 单一职责:只负责摄像头名称的查询和格式化
- 可扩展:新增格式只需修改配置,不需改代码
- 可测试:不依赖全局状态,便于单元测试
"""
from typing import Optional, Dict
import httpx
from app.config import CameraNameConfig
from app.utils.logger import logger
class CameraNameService:
"""摄像头名称服务"""
def __init__(self, config: CameraNameConfig):
"""
初始化服务
Args:
config: 摄像头名称配置
"""
self.config = config
async def get_camera_info(self, device_id: str) -> Optional[Dict]:
"""
从 WVP 查询摄像头信息
Args:
device_id: 设备ID支持两种格式
- camera_code 格式cam_xxxxxxxxxxxx
- app/stream 格式大堂吧台3/012
Returns:
摄像头信息字典,查询失败返回 None
"""
# camera_code 格式(推荐)
if device_id.startswith("cam_"):
return await self._query_by_camera_code(device_id)
# app/stream 格式(遗留格式,需要认证)
elif "/" in device_id:
logger.warning(f"使用遗留格式 app/stream: {device_id},建议使用 camera_code 格式")
return None # app/stream 格式需要token暂不支持无认证查询
return None
async def _query_by_camera_code(self, camera_code: str) -> Optional[Dict]:
"""
通过 camera_code 查询摄像头信息
Args:
camera_code: 摄像头编码格式cam_xxxxxxxxxxxx
Returns:
摄像头信息字典,查询失败返回 None
"""
try:
async with httpx.AsyncClient(timeout=self.config.query_timeout) as client:
resp = await client.get(
f"{self.config.wvp_api_base}/api/ai/camera/get",
params={"cameraCode": camera_code},
)
if resp.status_code == 200:
data = resp.json()
if data.get("code") == 0:
return data.get("data")
else:
logger.warning(
f"WVP查询摄像头失败: camera_code={camera_code}, "
f"status={resp.status_code}"
)
except Exception as e:
logger.error(f"WVP查询异常: camera_code={camera_code}, error={e}")
return None
def extract_name(self, camera_info: Dict) -> Optional[str]:
"""
从摄像头信息中提取名称
根据配置的字段优先级提取名称:
1. 按优先级遍历字段
2. 如果是 gbName 字段,自动去除 "/" 后缀
3. 返回第一个非空值
Args:
camera_info: 摄像头信息字典
Returns:
提取的名称,失败返回 None
"""
for field in self.config.name_field_priority:
value = camera_info.get(field)
if value:
# gb_name 可能包含 "/" 后缀,需要去除
if field == "gbName" and "/" in value:
value = value.split("/")[0]
return value
return None
def format_display_name(
self,
device_id: str,
camera_info: Optional[Dict] = None
) -> str:
"""
格式化摄像头显示名称
Args:
device_id: 设备IDfallback值
camera_info: 摄像头信息字典(可选)
Returns:
格式化后的显示名称
示例:
配置模板:"{camera_code} {name}/{stream}"
camera_info: {cameraCode: "cam_123", gbName: "大堂/", stream: "012"}
返回: "cam_123 大堂/012"
配置模板:"{name}"
返回: "大堂"
"""
# 如果没有摄像头信息直接返回device_id
if not camera_info:
return device_id
# 提取变量
camera_code = camera_info.get("cameraCode") or camera_info.get("camera_code")
stream = camera_info.get("stream")
name = self.extract_name(camera_info)
# 如果必需字段缺失返回device_id
if not camera_code or not name or not stream:
logger.warning(
f"摄像头信息不完整: camera_code={camera_code}, "
f"name={name}, stream={stream}, 使用fallback"
)
return device_id
# 按模板格式化
try:
return self.config.display_format.format(
camera_code=camera_code,
name=name,
stream=stream
)
except KeyError as e:
logger.error(f"格式化模板变量错误: {e}, 模板={self.config.display_format}")
return device_id
async def get_display_name(self, device_id: str) -> str:
"""
获取摄像头显示名称(一站式方法)
结合查询和格式化,返回最终显示名称
Args:
device_id: 设备ID
Returns:
格式化后的显示名称
"""
camera_info = await self.get_camera_info(device_id)
return self.format_display_name(device_id, camera_info)
# 全局单例(依赖注入)
_camera_name_service: Optional[CameraNameService] = None
def get_camera_name_service() -> CameraNameService:
"""
获取摄像头名称服务单例
Returns:
CameraNameService 实例
"""
global _camera_name_service
if _camera_name_service is None:
from app.config import settings
_camera_name_service = CameraNameService(settings.camera_name)
return _camera_name_service