""" 通知调度服务 告警创建后的异步处理流水线: 1. VLM 复核截图 → 写入 alarm_llm_analysis 2. 查表获取通知人员 (camera → area → person) 3. 推送企微互动卡片 全程异步执行,不阻塞告警接收主流程。 VLM 或企微不可用时自动降级,不影响系统运行。 """ from datetime import datetime from typing import Dict, List from app.models import ( get_session, AlarmEvent, AlarmLlmAnalysis, CameraAreaBinding, AreaPersonBinding, NotifyArea, ) from app.config import settings from app.services.vlm_service import get_vlm_service from app.services.wechat_service import get_wechat_service from app.utils.logger import logger async def process_alarm_notification(alarm_data: Dict): """ 告警通知处理流水线(异步,fire-and-forget) Args: alarm_data: 告警字典,包含 alarm_id, alarm_type, device_id, snapshot_url, alarm_level, event_time 等字段 """ alarm_id = alarm_data.get("alarm_id", "") alarm_type = alarm_data.get("alarm_type", "") device_id = alarm_data.get("device_id", "") snapshot_url = alarm_data.get("snapshot_url", "") alarm_level = alarm_data.get("alarm_level", 2) event_time = alarm_data.get("event_time", "") logger.info(f"开始处理告警通知: {alarm_id}") try: # ========== 1. VLM 复核 ========== vlm_service = get_vlm_service() camera_name = alarm_data.get("camera_name", device_id) roi_name = alarm_data.get("scene_id", "") vlm_result = await vlm_service.verify_alarm( snapshot_url=snapshot_url, alarm_type=alarm_type, camera_name=camera_name, roi_name=roi_name, ) # 写入 alarm_llm_analysis 表(复用已有表) _save_vlm_result(alarm_id, vlm_result) # VLM 判定为误报 → 更新告警状态,不通知 if not vlm_result.get("confirmed", True): _mark_false_alarm(alarm_id) logger.info(f"VLM 判定误报,跳过通知: {alarm_id}") return # ========== 2. 查表获取通知人员 ========== description = vlm_result.get("description", "") area_name, persons = _get_notify_persons(device_id, alarm_level) # 演示模式:数据库无人员时,使用配置的测试 userid if not persons and settings.wechat.test_uids: test_uids = [uid.strip() for uid in settings.wechat.test_uids.split(",") if uid.strip()] if test_uids: persons = [{"person_name": "测试用户", "wechat_uid": uid, "role": "TEST"} for uid in test_uids] area_name = "演示区域" logger.info(f"演示模式: 使用测试用户 {test_uids}") if not persons: logger.warning(f"未找到通知人员: camera={device_id}, 跳过企微推送") return # ========== 3. 推送企微通知 ========== wechat_service = get_wechat_service() if not wechat_service.enabled: logger.info("企微未启用,跳过推送") return user_ids = [p["wechat_uid"] for p in persons] event_time_str = ( event_time.strftime("%Y-%m-%d %H:%M:%S") if isinstance(event_time, datetime) else str(event_time or "") ) await wechat_service.send_alarm_card( user_ids=user_ids, alarm_id=alarm_id, alarm_type=alarm_type, area_name=area_name, camera_name=camera_name, description=description, snapshot_url=snapshot_url, event_time=event_time_str, alarm_level=alarm_level, service_base_url=settings.wechat.service_base_url or f"http://{settings.app.host}:{settings.app.port}", ) logger.info(f"告警通知完成: {alarm_id} → {len(persons)} 人") except Exception as e: logger.error(f"告警通知处理失败: {alarm_id}, error={e}", exc_info=True) def _save_vlm_result(alarm_id: str, vlm_result: Dict): """将 VLM 复核结果写入 alarm_llm_analysis 表""" db = get_session() try: analysis = AlarmLlmAnalysis( alarm_id=alarm_id, llm_model="qwen3-vl-flash", analysis_type="REVIEW", summary=vlm_result.get("description", ""), is_false_alarm=not vlm_result.get("confirmed", True), confidence_score=None if vlm_result.get("skipped") else 0.9, suggestion="VLM跳过" if vlm_result.get("skipped") else None, ) db.add(analysis) db.commit() except Exception as e: db.rollback() logger.error(f"保存 VLM 结果失败: {e}") finally: db.close() def _mark_false_alarm(alarm_id: str): """将告警标记为误报""" db = get_session() try: alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first() if alarm: alarm.alarm_status = "FALSE" alarm.handle_status = "DONE" alarm.handle_remark = "VLM复核判定误报" alarm.handled_at = datetime.now() db.commit() logger.info(f"告警已标记误报: {alarm_id}") except Exception as e: db.rollback() logger.error(f"标记误报失败: {e}") finally: db.close() def _get_notify_persons(camera_id: str, alarm_level: int) -> tuple: """ 根据摄像头ID查找通知人员 Returns: (area_name, [{"person_name": ..., "wechat_uid": ..., "role": ...}]) """ db = get_session() try: # camera → area binding = db.query(CameraAreaBinding).filter( CameraAreaBinding.camera_id == camera_id ).first() if not binding: return ("未知区域", []) # area info area = db.query(NotifyArea).filter( NotifyArea.area_id == binding.area_id, NotifyArea.enabled == 1, ).first() area_name = area.area_name if area else "未知区域" # area → persons persons = db.query(AreaPersonBinding).filter( AreaPersonBinding.area_id == binding.area_id, AreaPersonBinding.enabled == 1, AreaPersonBinding.notify_level <= alarm_level, ).all() result = [ { "person_name": p.person_name, "wechat_uid": p.wechat_uid, "role": p.role, } for p in persons ] return (area_name, result) except Exception as e: logger.error(f"查询通知人员失败: {e}") return ("未知区域", []) finally: db.close()