diff --git a/app/config.py b/app/config.py index eb30db9..2c7aac0 100644 --- a/app/config.py +++ b/app/config.py @@ -69,6 +69,34 @@ class RedisConfig: enabled: bool = True +@dataclass +class CameraNameConfig: + """摄像头名称格式化配置""" + # WVP API基础URL + wvp_api_base: str = "http://localhost:18080" + + # 显示格式模板(支持变量:{camera_code}, {name}, {stream}) + # 可选格式: + # - "{camera_code} {name}/{stream}" - cam_xxx 名称/流id + # - "{name}/{stream}" - 名称/流id + # - "{name}" - 仅名称 + # - "{camera_code}" - 仅code + display_format: str = "{camera_code} {name}/{stream}" + + # 名称字段优先级(从高到低) + # 从StreamProxy对象中提取名称时的字段优先级 + # 注意:gb_name 可能包含 "/" 后缀,会自动去除 + name_field_priority: list = None + + # 查询超时(秒) + query_timeout: int = 5 + + def __post_init__(self): + if self.name_field_priority is None: + # 默认优先级:gb_name > app > stream + self.name_field_priority = ["gbName", "app", "stream"] + + class Settings(BaseModel): """全局配置""" database: DatabaseConfig = DatabaseConfig() @@ -77,6 +105,7 @@ class Settings(BaseModel): ai_model: AIModelConfig = AIModelConfig() mqtt: MQTTConfig = MQTTConfig() redis: RedisConfig = RedisConfig() + camera_name: CameraNameConfig = CameraNameConfig() def load_settings() -> Settings: @@ -127,6 +156,11 @@ def load_settings() -> Settings: max_connections=int(os.getenv("REDIS_MAX_CONNECTIONS", "50")), enabled=os.getenv("REDIS_ENABLED", "true").lower() == "true", ), + camera_name=CameraNameConfig( + wvp_api_base=os.getenv("WVP_API_BASE", "http://localhost:18080"), + display_format=os.getenv("CAMERA_NAME_FORMAT", "{camera_code} {name}/{stream}"), + query_timeout=int(os.getenv("CAMERA_QUERY_TIMEOUT", "5")), + ), ) diff --git a/app/data/alert_platform.db b/app/data/alert_platform.db new file mode 100644 index 0000000..898bd90 Binary files /dev/null and b/app/data/alert_platform.db differ diff --git a/app/routers/yudao_aiot_alarm.py b/app/routers/yudao_aiot_alarm.py index 7406bd9..6429a3f 100644 --- a/app/routers/yudao_aiot_alarm.py +++ b/app/routers/yudao_aiot_alarm.py @@ -15,14 +15,13 @@ API 路径规范: from fastapi import APIRouter, Query, Depends, HTTPException from typing import Optional from datetime import datetime -import httpx import asyncio -import os from app.yudao_compat import YudaoResponse, get_current_user from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService from app.services.notification_service import get_notification_service from app.services.oss_storage import get_oss_storage +from app.services.camera_name_service import get_camera_name_service from app.schemas import EdgeAlarmReport, EdgeAlarmResolve from app.utils.logger import logger @@ -61,34 +60,13 @@ async def _alarm_to_camel(alarm_dict: dict, current_user: dict = None) -> dict: alarm_id = alarm_dict.get("alarm_id") - # 查询摄像头名称(统一格式:camera_code 摄像头名称/stream) + # 查询摄像头显示名称(使用配置化服务) device_id = alarm_dict.get("device_id") device_name = device_id # 默认使用 device_id - if current_user and device_id: - try: - camera_info = await _get_camera_info(device_id, current_user) - if camera_info: - # 获取摄像头中文名称(三级 fallback) - camera_cn_name = None - gb_name = camera_info.get("gbName") or camera_info.get("gb_name") - if gb_name: - camera_cn_name = gb_name.split("/")[0] - elif camera_info.get("name"): - camera_cn_name = camera_info.get("name") - elif camera_info.get("app"): - camera_cn_name = camera_info.get("app") - - # 统一格式:camera_code 摄像头名称/stream - camera_code = camera_info.get("cameraCode") or camera_info.get("camera_code") - stream = camera_info.get("stream") - - if camera_code and camera_cn_name and stream: - device_name = f"{camera_code} {camera_cn_name}/{stream}" - elif camera_cn_name: - device_name = camera_cn_name - except Exception as e: - logger.warning(f"告警列表查询摄像头信息失败: device_id={device_id}, error={e}") + if device_id: + camera_service = get_camera_name_service() + device_name = await camera_service.get_display_name(device_id) return { # 新字段(三表结构) @@ -274,40 +252,17 @@ async def get_device_summary_page( """获取设备告警汇总(分页)""" result = service.get_device_summary(page=pageNo, page_size=pageSize) - # 添加前端兼容字段别名,并查询摄像头名称 + # 添加前端兼容字段别名,并查询摄像头名称(使用配置化服务) + camera_service = get_camera_name_service() compat_list = [] for item in result.get("list", []): device_id = item.get("deviceId") - device_name = device_id # 默认使用 device_id - - # 尝试从 WVP 查询摄像头名称(统一格式:camera_code 摄像头名称/stream) - try: - camera_info = await _get_camera_info(device_id, current_user) - if camera_info: - # 获取摄像头中文名称(三级 fallback) - camera_cn_name = None - gb_name = camera_info.get("gbName") or camera_info.get("gb_name") - if gb_name: - camera_cn_name = gb_name.split("/")[0] - elif camera_info.get("name"): - camera_cn_name = camera_info.get("name") - elif camera_info.get("app"): - camera_cn_name = camera_info.get("app") - - # 统一格式:camera_code 摄像头名称/stream - camera_code = camera_info.get("cameraCode") or camera_info.get("camera_code") - stream = camera_info.get("stream") - - if camera_code and camera_cn_name and stream: - device_name = f"{camera_code} {camera_cn_name}/{stream}" - elif camera_cn_name: - device_name = camera_cn_name - except Exception as e: - logger.warning(f"查询摄像头信息失败: device_id={device_id}, error={e}") + # 使用配置化服务获取显示名称 + device_name = await camera_service.get_display_name(device_id) item["cameraId"] = device_id item["cameraName"] = device_name - item["deviceName"] = device_name # 更新 deviceName 为实际名称 + item["deviceName"] = device_name item["pendingCount"] = item.get("unhandledCount") item["lastAlertTime"] = item.get("lastEventTime") item["lastAlertType"] = item.get("lastAlarmType") @@ -389,64 +344,6 @@ async def edge_alarm_resolve( # ==================== 辅助函数 ==================== OPS_ALARM_URL = "http://192.168.0.104:48080/admin-api/ops/alarm/receive" -WVP_API_BASE = os.getenv("WVP_API_BASE", "http://localhost:18080") - - -async def _get_camera_info(device_id: str, current_user: dict) -> Optional[dict]: - """ - 从 WVP 查询摄像头信息 - 支持 camera_code 和 app/stream 两种格式 - """ - # 如果是 camera_code 格式(cam_xxxxxxxxxxxx) - if device_id.startswith("cam_"): - try: - async with httpx.AsyncClient(timeout=5) as client: - # 调用 WVP 查询单个摄像头的 API(无需认证,已加白名单) - resp = await client.get( - f"{WVP_API_BASE}/api/ai/camera/get", - params={"cameraCode": device_id}, - ) - logger.info(f"查询摄像头: device_id={device_id}, status={resp.status_code}") - if resp.status_code == 200: - data = resp.json() - logger.info(f"WVP 响应: {data}") - if data.get("code") == 0: - return data.get("data") - except Exception as e: - logger.warning(f"查询 camera_code 失败: {device_id}, error={e}") - - # 如果是 app/stream 格式 - elif "/" in device_id: - # 获取 token(app/stream 查询需要认证) - token = current_user.get("access_token") or current_user.get("token") - if not token: - logger.warning(f"查询 app/stream 失败: 缺少 token, device_id={device_id}") - return None - - headers = {"Authorization": f"Bearer {token}"} - parts = device_id.split("/", 1) - if len(parts) == 2: - app, stream = parts - try: - async with httpx.AsyncClient(timeout=5) as client: - # 查询摄像头列表,筛选 app 和 stream - resp = await client.get( - f"{WVP_API_BASE}/admin-api/aiot/device/camera/list", - params={"page": 1, "count": 1, "query": stream}, - headers=headers - ) - if resp.status_code == 200: - data = resp.json() - if data.get("code") == 0: - camera_list = data.get("data", {}).get("list", []) - # 找到匹配的摄像头 - for camera in camera_list: - if camera.get("app") == app and camera.get("stream") == stream: - return camera - except Exception as e: - logger.warning(f"查询 app/stream 失败: {device_id}, error={e}") - - return None async def _notify_ops_platform(data: dict): diff --git a/app/services/camera_name_service.py b/app/services/camera_name_service.py new file mode 100644 index 0000000..ba1c12e --- /dev/null +++ b/app/services/camera_name_service.py @@ -0,0 +1,195 @@ +""" +摄像头名称格式化服务 + +功能: +1. 从 WVP 查询摄像头信息 +2. 根据配置提取摄像头名称 +3. 按配置模板格式化显示名称 + +设计原则: +- 配置驱动:所有格式和字段映射通过配置文件控制 +- 单一职责:只负责摄像头名称的查询和格式化 +- 可扩展:新增格式只需修改配置,不需改代码 +- 可测试:不依赖全局状态,便于单元测试 +""" + +from typing import Optional, Dict +import httpx +from app.config import CameraNameConfig +from app.utils.logger import logger + + +class CameraNameService: + """摄像头名称服务""" + + def __init__(self, config: CameraNameConfig): + """ + 初始化服务 + + Args: + config: 摄像头名称配置 + """ + self.config = config + + async def get_camera_info(self, device_id: str) -> Optional[Dict]: + """ + 从 WVP 查询摄像头信息 + + Args: + device_id: 设备ID,支持两种格式: + - camera_code 格式:cam_xxxxxxxxxxxx + - app/stream 格式:大堂吧台3/012 + + Returns: + 摄像头信息字典,查询失败返回 None + """ + # camera_code 格式(推荐) + if device_id.startswith("cam_"): + return await self._query_by_camera_code(device_id) + + # app/stream 格式(遗留格式,需要认证) + elif "/" in device_id: + logger.warning(f"使用遗留格式 app/stream: {device_id},建议使用 camera_code 格式") + return None # app/stream 格式需要token,暂不支持无认证查询 + + return None + + async def _query_by_camera_code(self, camera_code: str) -> Optional[Dict]: + """ + 通过 camera_code 查询摄像头信息 + + Args: + camera_code: 摄像头编码,格式:cam_xxxxxxxxxxxx + + Returns: + 摄像头信息字典,查询失败返回 None + """ + try: + async with httpx.AsyncClient(timeout=self.config.query_timeout) as client: + resp = await client.get( + f"{self.config.wvp_api_base}/api/ai/camera/get", + params={"cameraCode": camera_code}, + ) + + if resp.status_code == 200: + data = resp.json() + if data.get("code") == 0: + return data.get("data") + else: + logger.warning( + f"WVP查询摄像头失败: camera_code={camera_code}, " + f"status={resp.status_code}" + ) + except Exception as e: + logger.error(f"WVP查询异常: camera_code={camera_code}, error={e}") + + return None + + def extract_name(self, camera_info: Dict) -> Optional[str]: + """ + 从摄像头信息中提取名称 + + 根据配置的字段优先级提取名称: + 1. 按优先级遍历字段 + 2. 如果是 gbName 字段,自动去除 "/" 后缀 + 3. 返回第一个非空值 + + Args: + camera_info: 摄像头信息字典 + + Returns: + 提取的名称,失败返回 None + """ + for field in self.config.name_field_priority: + value = camera_info.get(field) + if value: + # gb_name 可能包含 "/" 后缀,需要去除 + if field == "gbName" and "/" in value: + value = value.split("/")[0] + return value + + return None + + def format_display_name( + self, + device_id: str, + camera_info: Optional[Dict] = None + ) -> str: + """ + 格式化摄像头显示名称 + + Args: + device_id: 设备ID(fallback值) + camera_info: 摄像头信息字典(可选) + + Returns: + 格式化后的显示名称 + + 示例: + 配置模板:"{camera_code} {name}/{stream}" + camera_info: {cameraCode: "cam_123", gbName: "大堂/", stream: "012"} + 返回: "cam_123 大堂/012" + + 配置模板:"{name}" + 返回: "大堂" + """ + # 如果没有摄像头信息,直接返回device_id + if not camera_info: + return device_id + + # 提取变量 + camera_code = camera_info.get("cameraCode") or camera_info.get("camera_code") + stream = camera_info.get("stream") + name = self.extract_name(camera_info) + + # 如果必需字段缺失,返回device_id + if not camera_code or not name or not stream: + logger.warning( + f"摄像头信息不完整: camera_code={camera_code}, " + f"name={name}, stream={stream}, 使用fallback" + ) + return device_id + + # 按模板格式化 + try: + return self.config.display_format.format( + camera_code=camera_code, + name=name, + stream=stream + ) + except KeyError as e: + logger.error(f"格式化模板变量错误: {e}, 模板={self.config.display_format}") + return device_id + + async def get_display_name(self, device_id: str) -> str: + """ + 获取摄像头显示名称(一站式方法) + + 结合查询和格式化,返回最终显示名称 + + Args: + device_id: 设备ID + + Returns: + 格式化后的显示名称 + """ + camera_info = await self.get_camera_info(device_id) + return self.format_display_name(device_id, camera_info) + + +# 全局单例(依赖注入) +_camera_name_service: Optional[CameraNameService] = None + + +def get_camera_name_service() -> CameraNameService: + """ + 获取摄像头名称服务单例 + + Returns: + CameraNameService 实例 + """ + global _camera_name_service + if _camera_name_service is None: + from app.config import settings + _camera_name_service = CameraNameService(settings.camera_name) + return _camera_name_service diff --git a/docs/camera_name_config.md b/docs/camera_name_config.md new file mode 100644 index 0000000..74fd13c --- /dev/null +++ b/docs/camera_name_config.md @@ -0,0 +1,279 @@ +# 摄像头名称格式化配置 + +## 概述 + +摄像头名称格式化服务提供了灵活、可配置的方式来格式化摄像头显示名称。所有配置通过环境变量控制,无需修改代码即可调整显示格式。 + +## 架构设计 + +``` +┌─────────────────────────────────────────────────────┐ +│ config.py (CameraNameConfig) │ +│ - 显示格式模板 │ +│ - 字段优先级 │ +│ - WVP API配置 │ +└────────────────┬────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────┐ +│ camera_name_service.py (CameraNameService) │ +│ - 查询摄像头信息 │ +│ - 提取名称字段 │ +│ - 格式化显示名称 │ +└────────────────┬────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────┐ +│ yudao_aiot_alarm.py (路由层) │ +│ - 告警列表 │ +│ - 设备汇总 │ +└─────────────────────────────────────────────────────┘ +``` + +## 配置参数 + +### 环境变量 + +| 变量名 | 说明 | 默认值 | 示例 | +|--------|------|--------|------| +| `WVP_API_BASE` | WVP API基础URL | `http://localhost:18080` | `http://192.168.1.100:18080` | +| `CAMERA_NAME_FORMAT` | 显示格式模板 | `{camera_code} {name}/{stream}` | `{name}` | +| `CAMERA_QUERY_TIMEOUT` | 查询超时(秒) | `5` | `10` | + +### 显示格式模板 + +支持以下变量: + +| 变量 | 说明 | 示例值 | +|------|------|--------| +| `{camera_code}` | 摄像头编码 | `cam_1f0e3dad9990` | +| `{name}` | 摄像头名称(根据字段优先级提取) | `大堂吧台3` | +| `{stream}` | 流ID | `012` | + +### 常用格式示例 + +1. **完整格式**(默认): + ``` + CAMERA_NAME_FORMAT="{camera_code} {name}/{stream}" + 结果:cam_1f0e3dad9990 大堂吧台3/012 + ``` + +2. **仅名称+流ID**: + ``` + CAMERA_NAME_FORMAT="{name}/{stream}" + 结果:大堂吧台3/012 + ``` + +3. **仅名称**: + ``` + CAMERA_NAME_FORMAT="{name}" + 结果:大堂吧台3 + ``` + +4. **仅编码**: + ``` + CAMERA_NAME_FORMAT="{camera_code}" + 结果:cam_1f0e3dad9990 + ``` + +5. **自定义分隔符**: + ``` + CAMERA_NAME_FORMAT="{name} - {stream}" + 结果:大堂吧台3 - 012 + ``` + +### 名称字段优先级 + +服务会按以下优先级从 StreamProxy 对象中提取名称: + +1. **gbName**(国标名称)- 自动去除 "/" 后缀 +2. **app**(应用名) +3. **stream**(流ID) + +此优先级在代码中硬编码,如需修改请编辑 `app/services/camera_name_service.py`: + +```python +self.name_field_priority = ["gbName", "app", "stream"] +``` + +## 使用方式 + +### 1. 在环境变量中配置 + +`.env` 文件: +```bash +WVP_API_BASE=http://192.168.0.104:18080 +CAMERA_NAME_FORMAT={camera_code} {name}/{stream} +CAMERA_QUERY_TIMEOUT=5 +``` + +### 2. 服务自动加载配置 + +```python +from app.services.camera_name_service import get_camera_name_service + +camera_service = get_camera_name_service() +display_name = await camera_service.get_display_name("cam_1f0e3dad9990") +# 返回: "cam_1f0e3dad9990 大堂吧台3/012" +``` + +### 3. 修改格式只需重启服务 + +```bash +# 修改环境变量 +export CAMERA_NAME_FORMAT="{name}" + +# 重启服务 +systemctl restart service +``` + +## API 示例 + +### 查询摄像头信息 + +```python +camera_info = await camera_service.get_camera_info("cam_1f0e3dad9990") +# 返回: { +# "cameraCode": "cam_1f0e3dad9990", +# "app": "大堂吧台3", +# "stream": "012", +# "gbName": "大堂吧台3/", +# ... +# } +``` + +### 提取名称 + +```python +name = camera_service.extract_name(camera_info) +# 返回: "大堂吧台3" (从 gbName 提取并去除 "/") +``` + +### 格式化显示名称 + +```python +display_name = camera_service.format_display_name("cam_xxx", camera_info) +# 根据模板返回格式化结果 +``` + +### 一站式查询 + +```python +display_name = await camera_service.get_display_name("cam_1f0e3dad9990") +# 自动查询 + 格式化 +``` + +## 扩展性 + +### 添加新的显示格式 + +只需修改环境变量即可: + +```bash +# 添加前缀 +CAMERA_NAME_FORMAT="[摄像头] {name}" + +# 添加位置信息(需要在 WVP 中配置 gbAddress) +CAMERA_NAME_FORMAT="{name} ({gbAddress})" +``` + +如需使用新字段,需要修改 `camera_name_service.py` 中的 `format_display_name` 方法。 + +### 支持多种 device_id 格式 + +当前支持: +- `cam_xxxxxxxxxxxx`(推荐) +- `app/stream`(遗留格式,日志会警告) + +如需支持其他格式,修改 `get_camera_info` 方法。 + +### 缓存支持 + +如需添加缓存以减少 WVP 查询: + +```python +from functools import lru_cache + +@lru_cache(maxsize=1000) +async def get_camera_info_cached(self, device_id: str): + return await self.get_camera_info(device_id) +``` + +或使用 Redis 缓存(需集成 Redis 服务)。 + +## 故障排查 + +### 1. 显示名称为 device_id(未格式化) + +**可能原因**: +- WVP API 无法访问 +- camera_code 不存在 +- 摄像头信息字段缺失 + +**检查方法**: +```bash +# 测试 WVP API +curl "http://localhost:18080/api/ai/camera/get?cameraCode=cam_xxx" + +# 查看服务日志 +tail -f logs/app.log | grep "camera" +``` + +### 2. 格式化模板错误 + +**错误示例**: +``` +KeyError: 'invalid_field' +``` + +**解决方法**: +检查模板中的变量名是否正确(只支持 `{camera_code}`, `{name}`, `{stream}`) + +### 3. 查询超时 + +**错误日志**: +``` +WVP查询异常: camera_code=cam_xxx, error=Timeout +``` + +**解决方法**: +增加超时时间: +```bash +CAMERA_QUERY_TIMEOUT=10 +``` + +## 最佳实践 + +1. **使用 camera_code 格式**:推荐在数据库中存储 `camera_code` 而不是 `app/stream` +2. **配置监控**:监控 WVP API 可用性,查询失败时告警 +3. **缓存策略**:高并发场景下添加缓存减少 WVP 负载 +4. **日志级别**:生产环境设置 WARNING 级别,开发环境使用 INFO +5. **格式统一**:所有页面使用相同的 `get_display_name` 方法,保证一致性 + +## 性能优化 + +### 并发查询 + +告警列表使用 `asyncio.gather` 并发查询多个摄像头: + +```python +alarm_list = await asyncio.gather(*[ + _alarm_to_camel(a.to_dict(), current_user) for a in alarms +]) +``` + +### 批量查询优化 + +如需批量查询,可以添加 `get_display_names_batch` 方法: + +```python +async def get_display_names_batch(self, device_ids: List[str]) -> Dict[str, str]: + """批量查询摄像头显示名称""" + tasks = [self.get_display_name(device_id) for device_id in device_ids] + results = await asyncio.gather(*tasks) + return dict(zip(device_ids, results)) +``` + +## 版本历史 + +- **v1.0.0** (2026-02-24): 初始版本,支持配置化格式、字段优先级、WVP集成