""" 通知调度服务 告警创建后的异步处理流水线: 1. VLM 复核截图 → 写入 alarm_llm_analysis 2. 查表获取通知人员 (camera → area → person) 3. 推送企微通知: - 群聊:image + news + @text 组合消息 - 个人:button_interaction 模板卡片 全程异步执行,不阻塞告警接收主流程。 VLM 或企微不可用时自动降级,不影响系统运行。 """ import os from datetime import datetime from typing import Dict, List from app.models import ( get_session, AlarmEvent, AlarmLlmAnalysis, AlarmEventExt, CameraAreaBinding, AreaPersonBinding, NotifyArea, ) from app.config import settings from app.services.vlm_service import get_vlm_service from app.services.wechat_service import get_wechat_service, ALARM_TYPE_NAMES, ALARM_LEVEL_NAMES from app.services.camera_name_service import get_camera_name_service from app.services.work_order_client import get_work_order_client from app.utils.logger import logger from app.utils.timezone import beijing_now async def process_alarm_notification(alarm_data: Dict): """ 告警通知处理流水线(异步,fire-and-forget) Args: alarm_data: 告警字典,包含 alarm_id, alarm_type, device_id, snapshot_url, alarm_level, event_time 等字段 """ alarm_id = alarm_data.get("alarm_id", "") alarm_type = alarm_data.get("alarm_type", "") device_id = alarm_data.get("device_id", "") snapshot_url = alarm_data.get("snapshot_url", "") alarm_level = alarm_data.get("alarm_level", 2) event_time = alarm_data.get("event_time", "") area_id = alarm_data.get("area_id") logger.info(f"开始处理告警通知: {alarm_id}") try: # ========== 1. VLM 复核 ========== vlm_service = get_vlm_service() camera_name = alarm_data.get("camera_name", device_id) scene_id = alarm_data.get("scene_id", "") # 查找区域名称:优先从 IoT 平台查 area_id,降级到通知三表 area_name_for_vlm = await _get_area_name_from_iot(area_id) if area_id else "" if not area_name_for_vlm: area_name_for_vlm, _, _ = _get_notify_persons(device_id, alarm_level) roi_name = area_name_for_vlm if area_name_for_vlm != "未知区域" else scene_id # snapshot_url 可能是 COS object key,需转为可访问的预签名URL presigned_snapshot_url = _get_presigned_url(snapshot_url) vlm_result = await vlm_service.verify_alarm( snapshot_url=presigned_snapshot_url, alarm_type=alarm_type, camera_name=camera_name, roi_name=roi_name, ) # 写入 alarm_llm_analysis 表 _save_vlm_result(alarm_id, vlm_result) # VLM 明确判定为误报(非降级跳过)→ 更新告警状态,不通知 if not vlm_result.get("confirmed", True) and not vlm_result.get("skipped"): _mark_false_alarm(alarm_id) logger.info(f"VLM 判定误报,跳过通知: {alarm_id}") return # VLM 跳过(降级)且 confirmed=False → 不改告警状态,仅跳过通知 if not vlm_result.get("confirmed", True) and vlm_result.get("skipped"): logger.info(f"VLM 降级跳过,不推送但保留告警状态: {alarm_id}") return # ========== 2. 查表获取通知人员 ========== description = vlm_result.get("description", "") area_name = area_name_for_vlm _, area_id_int, persons = _get_notify_persons(device_id, alarm_level) # 演示模式:数据库无人员时,使用配置的测试 userid if not persons and settings.wechat.test_uids: test_uids = [uid.strip() for uid in settings.wechat.test_uids.split(",") if uid.strip()] if test_uids: persons = [{"person_name": "测试用户", "wechat_uid": uid, "role": "TEST"} for uid in test_uids] if not area_name or area_name == "未知区域": area_name = "演示区域" logger.info(f"演示模式: 使用测试用户 {test_uids}") if not persons: logger.warning(f"未找到通知人员: camera={device_id}, 跳过企微推送") return # ========== 3. 推送企微通知 ========== wechat_service = get_wechat_service() if not wechat_service.enabled: logger.info(f"企微未启用,跳过通知: {alarm_id}") return # 通知显示时间:只显示 MM-DD HH:MM(不含年份和秒) if isinstance(event_time, datetime): event_time_str = event_time.strftime("%m-%d %H:%M") else: # 字符串格式 "2026-03-19 10:54:24" → "03-19 10:54" s = str(event_time or "") event_time_str = s[5:16] if len(s) >= 16 else s user_ids = [p["wechat_uid"] for p in persons] group_chat_id = settings.wechat.group_chat_id # ---- 3a. 群聊组合消息 ---- if group_chat_id: await wechat_service.send_group_alarm_combo( chat_id=group_chat_id, alarm_id=alarm_id, alarm_type=alarm_type, area_name=area_name, camera_name=camera_name, description=description or f"{area_name} 检测到告警", event_time=event_time_str, alarm_level=alarm_level, snapshot_url=presigned_snapshot_url, mention_user_ids=user_ids, ) else: logger.debug("未配置群聊ID,跳过群聊推送") # ---- 3b. 个人按钮交互卡片 ---- sent = await wechat_service.send_alarm_card( user_ids=user_ids, alarm_id=alarm_id, alarm_type=alarm_type, area_name=area_name, camera_name=camera_name, description=description, event_time=event_time_str, alarm_level=alarm_level, ) if sent: logger.info(f"告警通知完成: {alarm_id}") else: logger.warning(f"个人卡片发送失败: {alarm_id}") # ---- 4. 创建安保工单(暂未上线,待本地测试通过后启用) ---- # wo_client = get_work_order_client() # if wo_client.enabled: # wo_area_id = _get_alarm_area_id(alarm_id) or area_id_int # if wo_area_id: # type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type) # wo_title = f"{type_name}告警" # trigger_source = _get_trigger_source(alarm_id) # permanent_url = _get_permanent_url(snapshot_url) # order_id = await wo_client.create_order( # title=wo_title, # area_id=wo_area_id, # alarm_id=alarm_id, # alarm_type=type_name, # description=description, # priority=alarm_level, # trigger_source=trigger_source, # camera_id=device_id, # image_url=permanent_url, # ) # if order_id: # _save_order_id(alarm_id, order_id) # else: # logger.warning(f"告警无 area_id,跳过工单创建: {alarm_id}") except Exception as e: logger.error(f"告警通知处理失败: {alarm_id}, error={e}", exc_info=True) def _get_presigned_url(snapshot_url: str) -> str: """将 COS object key 转为预签名 URL""" if not snapshot_url: return "" if snapshot_url.startswith("http"): return snapshot_url try: from app.services.oss_storage import get_oss_storage return get_oss_storage().get_presigned_url(snapshot_url) except Exception as e: logger.warning(f"生成预签名URL失败: {e}") return snapshot_url def _save_vlm_result(alarm_id: str, vlm_result: Dict): """将 VLM 复核结果写入 alarm_llm_analysis 表""" db = get_session() try: analysis = AlarmLlmAnalysis( alarm_id=alarm_id, llm_model="qwen3-vl-flash", analysis_type="REVIEW", summary=vlm_result.get("description", ""), is_false_alarm=not vlm_result.get("confirmed", True), confidence_score=None if vlm_result.get("skipped") else 0.9, suggestion="VLM跳过" if vlm_result.get("skipped") else None, ) db.add(analysis) db.commit() except Exception as e: db.rollback() logger.error(f"保存 VLM 结果失败: {e}") finally: db.close() def _mark_false_alarm(alarm_id: str): """将告警标记为误报""" db = get_session() try: alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first() if alarm: now = beijing_now() alarm.alarm_status = "FALSE" alarm.handle_status = "IGNORED" alarm.handle_remark = "VLM复核判定误报" alarm.handled_at = now if alarm.duration_ms is None and alarm.event_time: try: delta = now - alarm.event_time alarm.duration_ms = int(delta.total_seconds() * 1000) except Exception: pass if alarm.last_frame_time is None: alarm.last_frame_time = now db.commit() logger.info(f"告警已标记误报: {alarm_id}") except Exception as e: db.rollback() logger.error(f"标记误报失败: {e}") finally: db.close() def _get_notify_persons(camera_id: str, alarm_level: int) -> tuple: """ 根据摄像头ID查找通知人员 Returns: (area_name, area_id, [{"person_name": ..., "wechat_uid": ..., "role": ...}]) """ db = get_session() try: binding = db.query(CameraAreaBinding).filter( CameraAreaBinding.camera_id == camera_id ).first() if not binding: return ("未知区域", 0, []) area = db.query(NotifyArea).filter( NotifyArea.area_id == binding.area_id, NotifyArea.enabled == 1, ).first() area_name = area.area_name if area else "未知区域" area_id = binding.area_id or 0 persons = db.query(AreaPersonBinding).filter( AreaPersonBinding.area_id == binding.area_id, AreaPersonBinding.enabled == 1, AreaPersonBinding.notify_level >= alarm_level, ).all() result = [ { "person_name": p.person_name, "wechat_uid": p.wechat_uid, "role": p.role, } for p in persons ] return (area_name, area_id, result) except Exception as e: logger.error(f"查询通知人员失败: {e}") return ("未知区域", 0, []) finally: db.close() # 区域名称缓存(area_id → (area_name, expire_time)),5 分钟 TTL _area_name_cache: Dict[int, tuple] = {} _iot_token_cache: Dict[str, str] = {"token": "", "expire": 0} _AREA_CACHE_TTL = 300 # 5 分钟 async def _get_area_name_from_iot(area_id: int) -> str: """从 IoT 平台查询区域名称(带 TTL 缓存)""" if not area_id: return "" cached = _area_name_cache.get(area_id) if cached and cached[1] > __import__('time').time(): return cached[0] try: import httpx import time # IoT 平台地址(区域查询),优先级:IOT_PLATFORM_URL > WORK_ORDER_BASE_URL base_url = ( os.getenv("IOT_PLATFORM_URL", "") or settings.work_order.base_url ) if not base_url: return "" # Token 缓存(有效期内复用) now = time.time() if not _iot_token_cache["token"] or now > _iot_token_cache["expire"]: async with httpx.AsyncClient(timeout=5) as client: login_resp = await client.post( f"{base_url}/admin-api/system/auth/login", json={"username": "admin", "password": "admin123", "tenantName": "默认"}, headers={"tenant-id": "1"}, ) login_data = login_resp.json().get("data", {}) _iot_token_cache["token"] = login_data.get("accessToken", "") # token 有效期约 30 分钟,这里缓存 20 分钟 _iot_token_cache["expire"] = now + 1200 token = _iot_token_cache["token"] if not token: return "" async with httpx.AsyncClient(timeout=5) as client: resp = await client.get( f"{base_url}/admin-api/ops/area/get", params={"id": area_id}, headers={"tenant-id": "1", "Authorization": f"Bearer {token}"}, ) data = resp.json() if data.get("code") == 0 and data.get("data"): name = data["data"].get("areaName", "") if name: _area_name_cache[area_id] = (name, __import__('time').time() + _AREA_CACHE_TTL) return name except Exception as e: logger.warning(f"查询IoT平台区域名失败: area_id={area_id}, error={e}") return "" def _get_alarm_area_id(alarm_id: str) -> int: """从 alarm_event 表获取 area_id""" db = get_session() try: alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first() return alarm.area_id if alarm and alarm.area_id else 0 except Exception: return 0 finally: db.close() def _get_trigger_source(alarm_id: str) -> str: """从 alarm_event_ext 的 ext_type 判断告警来源""" db = get_session() try: ext = db.query(AlarmEventExt).filter( AlarmEventExt.alarm_id == alarm_id, AlarmEventExt.ext_type.in_(["EDGE_HTTP", "EDGE", "POST", "MANUAL"]), ).first() if ext: return { "EDGE_HTTP": "自动上报", "EDGE": "自动上报", "POST": "人工上报", "MANUAL": "人工上报", }.get(ext.ext_type, "自动上报") return "自动上报" except Exception: return "自动上报" finally: db.close() def _get_permanent_url(snapshot_url: str) -> str: """将 COS object key 转为永久访问 URL""" if not snapshot_url: return "" if snapshot_url.startswith("http"): return snapshot_url try: from app.services.oss_storage import get_oss_storage return get_oss_storage().get_permanent_url(snapshot_url) except Exception as e: logger.warning(f"生成永久URL失败: {e}") return "" def _save_order_id(alarm_id: str, order_id: str): """将工单ID保存到 alarm_event_ext(ext_type=WORK_ORDER)""" db = get_session() try: ext = AlarmEventExt( alarm_id=alarm_id, ext_type="WORK_ORDER", ext_data={"order_id": order_id}, ) db.add(ext) db.commit() logger.info(f"工单ID已关联: alarm={alarm_id}, order={order_id}") except Exception as e: db.rollback() logger.error(f"保存工单ID失败: {e}") finally: db.close()