diff --git a/app/routers/edge_compat.py b/app/routers/edge_compat.py index e7fd5a7..2b3e7db 100644 --- a/app/routers/edge_compat.py +++ b/app/routers/edge_compat.py @@ -56,6 +56,7 @@ async def edge_alarm_report( "event_time": alarm.event_time, "alarm_level": alarm.alarm_level, "snapshot_url": alarm.snapshot_url, + "area_id": alarm.area_id, } asyncio.create_task(process_alarm_notification(notify_data)) except Exception as e: diff --git a/app/services/notify_dispatch.py b/app/services/notify_dispatch.py index 6490147..7e18dea 100644 --- a/app/services/notify_dispatch.py +++ b/app/services/notify_dispatch.py @@ -23,6 +23,7 @@ from app.models import ( from app.config import settings from app.services.vlm_service import get_vlm_service from app.services.wechat_service import get_wechat_service, ALARM_TYPE_NAMES, ALARM_LEVEL_NAMES +from app.services.camera_name_service import get_camera_name_service from app.services.work_order_client import get_work_order_client from app.utils.logger import logger from app.utils.timezone import beijing_now @@ -42,6 +43,7 @@ async def process_alarm_notification(alarm_data: Dict): snapshot_url = alarm_data.get("snapshot_url", "") alarm_level = alarm_data.get("alarm_level", 2) event_time = alarm_data.get("event_time", "") + area_id = alarm_data.get("area_id") logger.info(f"开始处理告警通知: {alarm_id}") @@ -51,8 +53,10 @@ async def process_alarm_notification(alarm_data: Dict): camera_name = alarm_data.get("camera_name", device_id) scene_id = alarm_data.get("scene_id", "") - # 查找区域名称用于 VLM prompt(比 UUID 更有意义) - area_name_for_vlm, _, _ = _get_notify_persons(device_id, alarm_level) + # 查找区域名称:优先从 IoT 平台查 area_id,降级到通知三表 + area_name_for_vlm = await _get_area_name_from_iot(area_id) if area_id else "" + if not area_name_for_vlm: + area_name_for_vlm, _, _ = _get_notify_persons(device_id, alarm_level) roi_name = area_name_for_vlm if area_name_for_vlm != "未知区域" else scene_id # snapshot_url 可能是 COS object key,需转为可访问的预签名URL @@ -89,7 +93,8 @@ async def process_alarm_notification(alarm_data: Dict): test_uids = [uid.strip() for uid in settings.wechat.test_uids.split(",") if uid.strip()] if test_uids: persons = [{"person_name": "测试用户", "wechat_uid": uid, "role": "TEST"} for uid in test_uids] - area_name = "演示区域" + if not area_name or area_name == "未知区域": + area_name = "演示区域" logger.info(f"演示模式: 使用测试用户 {test_uids}") if not persons: @@ -284,6 +289,59 @@ def _get_notify_persons(camera_id: str, alarm_level: int) -> tuple: db.close() +# 区域名称缓存(area_id → area_name),避免重复查询 +_area_name_cache: Dict[int, str] = {} +_iot_token_cache: Dict[str, str] = {"token": "", "expire": 0} + + +async def _get_area_name_from_iot(area_id: int) -> str: + """从 IoT 平台查询区域名称(带缓存)""" + if not area_id: + return "" + if area_id in _area_name_cache: + return _area_name_cache[area_id] + try: + import httpx + import time + base_url = settings.work_order.base_url + if not base_url: + return "" + + # Token 缓存(有效期内复用) + now = time.time() + if not _iot_token_cache["token"] or now > _iot_token_cache["expire"]: + async with httpx.AsyncClient(timeout=5) as client: + login_resp = await client.post( + f"{base_url}/admin-api/system/auth/login", + json={"username": "admin", "password": "admin123", "tenantName": "默认"}, + headers={"tenant-id": "1"}, + ) + login_data = login_resp.json().get("data", {}) + _iot_token_cache["token"] = login_data.get("accessToken", "") + # token 有效期约 30 分钟,这里缓存 20 分钟 + _iot_token_cache["expire"] = now + 1200 + + token = _iot_token_cache["token"] + if not token: + return "" + + async with httpx.AsyncClient(timeout=5) as client: + resp = await client.get( + f"{base_url}/admin-api/ops/area/get", + params={"id": area_id}, + headers={"tenant-id": "1", "Authorization": f"Bearer {token}"}, + ) + data = resp.json() + if data.get("code") == 0 and data.get("data"): + name = data["data"].get("areaName", "") + if name: + _area_name_cache[area_id] = name + return name + except Exception as e: + logger.warning(f"查询IoT平台区域名失败: area_id={area_id}, error={e}") + return "" + + def _get_alarm_area_id(alarm_id: str) -> int: """从 alarm_event 表获取 area_id""" db = get_session()