From 0f27caf0b52e54c3bb5d0e8bd648afe08fe3660c Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Tue, 24 Mar 2026 13:38:45 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=EF=BC=9A=E6=91=84=E5=83=8F?= =?UTF-8?q?=E5=A4=B4=E5=90=8D=E7=A7=B0=E6=98=BE=E7=A4=BA=E5=85=A8=E9=93=BE?= =?UTF-8?q?=E8=B7=AF=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - config.py: 简化 CameraNameConfig,删除 display_format/name_field_priority - camera_name_service.py: 重写名称解析,固定优先级 cameraName→gbName→device_id, 删除废弃的 app/stream 格式解析和 extract_name 方法 - yudao_aiot_alarm.py: 删除 stream→cameraId 的错误映射,cameraId 直接用 device_id - agent_dispatcher.py: query_camera 删除技术字段返回,list_my_orders 添加摄像头名称解析 --- app/config.py | 24 +-- app/routers/yudao_aiot_alarm.py | 28 +--- app/services/agent_dispatcher.py | 13 +- app/services/camera_name_service.py | 248 +++++----------------------- 4 files changed, 49 insertions(+), 264 deletions(-) diff --git a/app/config.py b/app/config.py index c8ffed6..795b479 100644 --- a/app/config.py +++ b/app/config.py @@ -109,31 +109,10 @@ class EdgeAuthConfig: @dataclass class CameraNameConfig: - """摄像头名称格式化配置""" - # WVP API基础URL + """摄像头名称配置""" wvp_api_base: str = "http://localhost:18080" - - # 显示格式模板(支持变量:{camera_code}, {name}, {stream}) - # 可选格式: - # - "{name}" - 仅名称(推荐,告警列表使用) - # - "{camera_code} {name}/{stream}" - cam_xxx 名称/流id(完整格式) - # - "{name}/{stream}" - 名称/流id - # - "{camera_code}" - 仅code - display_format: str = "{name}" - - # 名称字段优先级(从高到低) - # 从StreamProxy对象中提取名称时的字段优先级 - # 注意:gb_name 可能包含 "/" 后缀,会自动去除 - name_field_priority: list = None - - # 查询超时(秒) query_timeout: int = 15 - def __post_init__(self): - if self.name_field_priority is None: - # 默认优先级:gb_name > app > stream - self.name_field_priority = ["gbName", "app", "stream"] - class Settings(BaseModel): """全局配置""" @@ -222,7 +201,6 @@ def load_settings() -> Settings: ), camera_name=CameraNameConfig( wvp_api_base=os.getenv("WVP_API_BASE", "http://localhost:18080"), - display_format=os.getenv("CAMERA_NAME_FORMAT", "{name}"), query_timeout=int(os.getenv("CAMERA_QUERY_TIMEOUT", "15")), ), edge_auth=EdgeAuthConfig( diff --git a/app/routers/yudao_aiot_alarm.py b/app/routers/yudao_aiot_alarm.py index e5d8425..16b52ca 100644 --- a/app/routers/yudao_aiot_alarm.py +++ b/app/routers/yudao_aiot_alarm.py @@ -79,18 +79,6 @@ async def _alarm_to_camel(alarm_dict: dict, camera_info_map: dict = None, camera if camera_service and camera_info: device_name = camera_service.format_display_name(device_id, camera_info) - # 提取摄像头ID(统一使用stream作为编号) - camera_id = device_id # 默认值 - if camera_info: - stream = camera_info.get("stream") - if stream: - camera_id = stream - elif device_id and "/" in device_id: - # app/stream 格式,直接解析 - parts = device_id.split("/", 1) - if len(parts) == 2: - camera_id = parts[1] - return { # 新字段(三表结构) "alarmId": alarm_id, @@ -122,7 +110,7 @@ async def _alarm_to_camel(alarm_dict: dict, camera_info_map: dict = None, camera "id": alarm_id, "alertNo": alarm_id, "deviceId": device_id, # 原始ID(用于查询) - "cameraId": camera_id, # 摄像头ID(stream编号) + "cameraId": device_id, # 摄像头ID "cameraName": device_name, # 摄像头名称 "alertType": alarm_dict.get("alarm_type"), "alertTypeName": _get_alarm_type_name(alarm_dict.get("alarm_type")), @@ -403,20 +391,8 @@ async def get_device_summary_page( # 提取摄像头名称 device_name = camera_service.format_display_name(device_id, camera_info) - # 提取摄像头ID(统一使用stream作为编号) - camera_id = device_id # 默认值 - if camera_info: - stream = camera_info.get("stream") - if stream: - camera_id = stream - elif "/" in device_id: - # app/stream 格式,直接解析 - parts = device_id.split("/", 1) - if len(parts) == 2: - camera_id = parts[1] - item["deviceId"] = device_id # 原始ID(用于查询) - item["cameraId"] = camera_id # 摄像头ID(stream编号) + item["cameraId"] = device_id # 摄像头ID item["cameraName"] = device_name # 摄像头名称 item["deviceName"] = device_name # 摄像头名称(兼容) item["pendingCount"] = item.get("unhandledCount") diff --git a/app/services/agent_dispatcher.py b/app/services/agent_dispatcher.py index f819e65..473ad6c 100644 --- a/app/services/agent_dispatcher.py +++ b/app/services/agent_dispatcher.py @@ -630,7 +630,9 @@ class AgentDispatcher: async def _tool_list_my_orders(self, args: dict, user_id: str) -> dict: """查询我的待处理工单""" from app.services.alarm_event_service import get_alarm_event_service + from app.services.camera_name_service import get_camera_name_service svc = get_alarm_event_service() + camera_service = get_camera_name_service() # 查询 handler=user_id & handle_status=HANDLING 的告警 alarms, total = svc.get_alarms( @@ -647,6 +649,12 @@ class AgentDispatcher: items = [] for a in my_alarms: + cam_name = a.device_id + try: + cam_info = await camera_service.get_camera_info(a.device_id) + cam_name = camera_service.format_display_name(a.device_id, cam_info) + except Exception: + pass event_time = "" if a.event_time: try: @@ -656,7 +664,7 @@ class AgentDispatcher: items.append({ "alarm_id": a.alarm_id, "type": ALARM_TYPE_NAMES.get(a.alarm_type, a.alarm_type), - "device_id": a.device_id, + "camera": cam_name, "time": event_time, }) @@ -729,9 +737,6 @@ class AgentDispatcher: return { "camera_id": camera_id, "name": camera_service.format_display_name(camera_id, cam_info), - "info": {k: v for k, v in cam_info.items() if k in ( - "app", "stream", "gbName", "gbId", "mediaServerId", "originType", - )}, } return {"error": f"未找到摄像头: {camera_id}"} except Exception as e: diff --git a/app/services/camera_name_service.py b/app/services/camera_name_service.py index c624938..437c786 100644 --- a/app/services/camera_name_service.py +++ b/app/services/camera_name_service.py @@ -1,21 +1,14 @@ """ -摄像头名称格式化服务 +摄像头名称服务 功能: -1. 从 WVP 查询摄像头信息 -2. 根据配置提取摄像头名称 -3. 按配置模板格式化显示名称 -4. 批量查询和缓存优化 - -设计原则: -- 配置驱动:所有格式和字段映射通过配置文件控制 -- 单一职责:只负责摄像头名称的查询和格式化 -- 可扩展:新增格式只需修改配置,不需改代码 -- 可测试:不依赖全局状态,便于单元测试 -- 性能优化:批量查询、去重、请求内缓存 +1. 从 WVP 查询摄像头信息(按 camera_code) +2. 固定优先级提取名称:cameraName → gbName → device_id +3. 批量查询和缓存优化 """ from typing import Optional, Dict, List +import asyncio import time import httpx from app.config import CameraNameConfig @@ -34,56 +27,21 @@ class CameraNameService: async def get_camera_info(self, device_id: str) -> Optional[Dict]: """从 WVP 查询摄像头信息(带缓存)""" - # 检查缓存 + if not device_id: + return None + cached = self._cache.get(device_id) if cached and cached[1] > time.time(): return cached[0] - info = None - # camera_code 格式:cam 前缀(cam_xxx 或 camXXX 均支持) - if device_id.lower().startswith("cam"): - info = await self._query_by_camera_code(device_id) - # app/stream 格式(遗留格式,直接解析) - elif "/" in device_id: - info = self._parse_app_stream_format(device_id) + info = await self._query_by_camera_code(device_id) # 写入缓存(包括 None 结果,避免反复查询不存在的设备) self._cache[device_id] = (info, time.time() + self.CACHE_TTL) return info - def _parse_app_stream_format(self, device_id: str) -> Optional[Dict]: - """ - 解析 app/stream 格式的 device_id - - Args: - device_id: 格式如 "大堂吧台3/012" - - Returns: - 虚拟的摄像头信息字典 - """ - parts = device_id.split("/", 1) - if len(parts) != 2: - return None - - app, stream = parts - # 构造虚拟的摄像头信息(兼容统一格式化逻辑) - return { - "app": app, # 中文名称 - "stream": stream, # 流ID - "gbName": app, # 兼容字段 - "cameraCode": None, # app/stream 格式没有 camera_code - } - async def _query_by_camera_code(self, camera_code: str) -> Optional[Dict]: - """ - 通过 camera_code 查询摄像头信息 - - Args: - camera_code: 摄像头编码,格式:cam_xxxxxxxxxxxx - - Returns: - 摄像头信息字典,查询失败返回 None - """ + """通过 camera_code 查询摄像头信息""" try: async with httpx.AsyncClient(timeout=self.config.query_timeout) as client: resp = await client.get( @@ -105,176 +63,49 @@ class CameraNameService: return None - async def get_camera_infos_batch(self, device_ids: List[str]) -> Dict[str, Optional[Dict]]: - """ - 批量查询摄像头信息(并发+去重优化) - - Args: - device_ids: 设备ID列表 - - Returns: - {device_id: camera_info} 字典 - """ - # 去重 - unique_ids = list(set(device_ids)) - - # 分类:camera_code 和 app/stream 格式 - camera_code_ids = [did for did in unique_ids if did.lower().startswith("cam")] - app_stream_ids = [did for did in unique_ids if "/" in did] - - # 初始化结果映射 - info_map = {} - - # 并发查询 camera_code 格式 - if camera_code_ids: - import asyncio - tasks = [self.get_camera_info(did) for did in camera_code_ids] - results = await asyncio.gather(*tasks) - info_map.update(dict(zip(camera_code_ids, results))) - - # 直接解析 app/stream 格式(无需查询) - for did in app_stream_ids: - info_map[did] = self._parse_app_stream_format(did) - - # 补充其他格式(未识别的) - for did in unique_ids: - if did not in info_map: - info_map[did] = None - - return info_map - - def extract_name(self, camera_info: Dict) -> Optional[str]: - """ - 从摄像头信息中提取名称 - - 根据配置的字段优先级提取名称: - 1. 按优先级遍历字段 - 2. 如果是 gbName 字段,自动去除 "/" 后缀 - 3. 返回第一个非空值 - - Args: - camera_info: 摄像头信息字典 - - Returns: - 提取的名称,失败返回 None - """ - for field in self.config.name_field_priority: - value = camera_info.get(field) - if value: - # gb_name 可能包含 "/" 后缀,需要去除 - if field == "gbName" and "/" in value: - value = value.split("/")[0] - return value - - return None - - def format_display_name( - self, - device_id: str, - camera_info: Optional[Dict] = None - ) -> str: + def format_display_name(self, device_id: str, camera_info: Optional[Dict] = None) -> str: """ 格式化摄像头显示名称 - Args: - device_id: 设备ID(fallback值) - camera_info: 摄像头信息字典(可选) - - Returns: - 格式化后的显示名称 - - 示例: - 配置模板:"{camera_code} {name}/{stream}" - camera_info: {cameraCode: "cam_123", gbName: "大堂/", stream: "012"} - 返回: "cam_123 大堂/012" - - 配置模板:"{name}" - 返回: "大堂" - - app/stream格式: - device_id: "大堂吧台3/012" - camera_info: {app: "大堂吧台3", stream: "012", cameraCode: None} - 返回: "大堂吧台3"(忽略模板,直接返回名称) + 固定优先级:cameraName → gbName → device_id """ - # 如果没有摄像头信息,直接返回device_id if not camera_info: return device_id - # 优先使用 camera_name(用户自定义名称),跳过无效值 - camera_name = camera_info.get("cameraName") or camera_info.get("camera_name") - if camera_name and camera_name.lower() != "default": - return camera_name - - # 降级:提取变量 - camera_code = camera_info.get("cameraCode") or camera_info.get("camera_code") - stream = camera_info.get("stream") - name = self.extract_name(camera_info) - - # 如果没有提取到名称,返回device_id - if not name: - logger.warning(f"无法提取摄像头名称: device_id={device_id}") - return device_id - - # 对于 app/stream 格式(没有 camera_code),直接返回名称 - if not camera_code: + # 1. cameraName(用户自定义名称) + name = camera_info.get("cameraName") or camera_info.get("camera_name") + if name and name.lower() != "default": return name - # 对于 camera_code 格式,检查模板需要的变量 - # 如果模板只需要 {name},直接返回名称 - if self.config.display_format == "{name}": - return name + # 2. gbName(国标名称,去 "/" 后缀) + name = camera_info.get("gbName") or camera_info.get("gb_name") + if name: + if "/" in name: + name = name.split("/")[0] + if name: + return name - # 完整格式需要所有字段 - if not stream: - logger.warning( - f"摄像头信息不完整: camera_code={camera_code}, " - f"name={name}, stream={stream}, 使用fallback" - ) - return name # 至少返回名称 + # 3. 兜底 + return device_id - # 按模板格式化 - try: - return self.config.display_format.format( - camera_code=camera_code, - name=name, - stream=stream - ) - except KeyError as e: - logger.error(f"格式化模板变量错误: {e}, 模板={self.config.display_format}") - return name # 出错时至少返回名称 + async def get_camera_infos_batch(self, device_ids: List[str]) -> Dict[str, Optional[Dict]]: + """批量查询摄像头信息(并发+去重优化)""" + unique_ids = list(set(did for did in device_ids if did)) + if not unique_ids: + return {} + + tasks = [self.get_camera_info(did) for did in unique_ids] + results = await asyncio.gather(*tasks) + return dict(zip(unique_ids, results)) async def get_display_name(self, device_id: str) -> str: - """ - 获取摄像头显示名称(一站式方法) - - 结合查询和格式化,返回最终显示名称 - - Args: - device_id: 设备ID - - Returns: - 格式化后的显示名称 - """ + """获取摄像头显示名称(一站式方法)""" camera_info = await self.get_camera_info(device_id) return self.format_display_name(device_id, camera_info) - async def get_display_names_batch( - self, - device_ids: List[str] - ) -> Dict[str, str]: - """ - 批量获取摄像头显示名称(性能优化版本) - - Args: - device_ids: 设备ID列表 - - Returns: - {device_id: display_name} 字典 - """ - # 批量查询摄像头信息(去重+并发) + async def get_display_names_batch(self, device_ids: List[str]) -> Dict[str, str]: + """批量获取摄像头显示名称""" info_map = await self.get_camera_infos_batch(device_ids) - - # 格式化所有名称 return { did: self.format_display_name(did, info_map.get(did)) for did in device_ids @@ -286,12 +117,7 @@ _camera_name_service: Optional[CameraNameService] = None def get_camera_name_service() -> CameraNameService: - """ - 获取摄像头名称服务单例 - - Returns: - CameraNameService 实例 - """ + """获取摄像头名称服务单例""" global _camera_name_service if _camera_name_service is None: from app.config import settings