From 09e0dbbc388aad73f5421977971924d857b297ae Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Fri, 6 Mar 2026 11:14:57 +0800 Subject: [PATCH] =?UTF-8?q?docs:=20=E6=B7=BB=E5=8A=A0=20VLM=20=E9=AA=8C?= =?UTF-8?q?=E8=AF=81=20+=20=E4=BC=81=E5=BE=AE=E9=80=9A=E7=9F=A5=20V1=20?= =?UTF-8?q?=E5=AE=9E=E6=96=BD=E8=AE=A1=E5=88=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 包含 9 个任务的详细实施计划: - 数据库表设计(通知区域、摄像头绑定、人员绑定) - VLM 视觉验证服务 - 企微通知服务 - 告警主流程集成 - 通知管理 API --- .../2026-03-06-vlm-wechat-notification-v1.md | 1270 +++++++++++++++++ 1 file changed, 1270 insertions(+) create mode 100644 docs/plans/2026-03-06-vlm-wechat-notification-v1.md diff --git a/docs/plans/2026-03-06-vlm-wechat-notification-v1.md b/docs/plans/2026-03-06-vlm-wechat-notification-v1.md new file mode 100644 index 0000000..df5f8c9 --- /dev/null +++ b/docs/plans/2026-03-06-vlm-wechat-notification-v1.md @@ -0,0 +1,1270 @@ +# V1: VLM 复核 + 企微通知 + 手动结单 实现计划 + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** 边缘端告警上报后,vsp-service 异步调用 VLM 复核截图,确认告警后通过企微互动卡片通知责任人,安保人员可通过企微按钮手动结单。 + +**Architecture:** 在现有 `POST /api/ai/alert/edge/report` 告警接收流程末尾追加异步任务,调用 qwen3-vl-flash VLM 复核截图,结果写入已有的 `alarm_llm_analysis` 表。VLM 确认后,通过 camera→area→person 三层查表找到责任人,推送企微互动卡片。企微按钮回调更新已有的 `alarm_event.handle_status` 字段完成手动结单。全程不修改现有表结构,不破坏现有功能。 + +**Tech Stack:** Python 3.8+, FastAPI, AsyncOpenAI (dashscope), 企微 Webhook/API, SQLAlchemy, MySQL + +--- + +## 现有架构关键约束 + +### 真实生产入口 + +边缘端使用 **无认证** 端点上报告警: +- **文件:** `app/routers/edge_compat.py:23` +- **路径:** `POST /api/ai/alert/edge/report` +- **流程:** `create_from_edge_report()` → 存 MySQL → WebSocket 推前端 + +认证端点 `app/routers/yudao_aiot_alarm.py:350` 额外调用 OPS 回调,但边缘端不走这条路。 + +### 不可修改的现有功能 + +1. 告警存入 `alarm_event` 表的逻辑不变 +2. WebSocket 推送前端的逻辑不变 +3. 边缘端 resolve 自动结单逻辑不变 +4. 前端告警列表/处理/统计接口不变 +5. OPS 平台回调(认证端点)不变 + +### 可复用的现有资源 + +| 资源 | 位置 | 复用方式 | +|------|------|---------| +| `alarm_llm_analysis` 表 | `app/models.py:339-366` | 存 VLM 复核结果(summary, is_false_alarm) | +| `alarm_event.handle_status` | `app/models.py:289-291` | 手动结单状态更新 (UNHANDLED→HANDLING→DONE) | +| `alarm_event.alarm_status` | `app/models.py:287-288` | 误报标记 (NEW→FALSE) | +| `alarm_event.handler` | `app/models.py:295` | 记录操作人 | +| `alarm_event.handle_remark` | `app/models.py:296` | 记录操作备注 | +| `AlarmEventService.handle_alarm()` | `app/services/alarm_event_service.py:394-426` | 更新告警处理状态 | +| `AlarmLlmAnalysis` 模型 | `app/models.py:339-366` | 直接写入 VLM 结果 | +| `AppConfig.ai_model` | `app/config.py:36-40` | 扩展添加 VLM 配置 | + +--- + +## Task 1: 新增 3 张通知路由表的 SQLAlchemy 模型 + +**Files:** +- Modify: `app/models.py` (末尾追加 3 个 Model 类) + +**Step 1: 在 models.py 末尾添加 3 个新模型** + +在文件末尾(`AlarmLlmAnalysis` 类之后)追加: + +```python +class NotifyArea(Base): + """通知区域""" + __tablename__ = "notify_area" + + id = Column(Integer, primary_key=True, autoincrement=True) + area_id = Column(String(36), unique=True, nullable=False, index=True) + area_name = Column(String(100), nullable=False) + description = Column(String(200), nullable=True) + enabled = Column(SmallInteger, default=1) + created_at = Column(DateTime, default=datetime.now) + updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) + + +class CameraAreaBinding(Base): + """摄像头-区域映射""" + __tablename__ = "camera_area_binding" + + id = Column(Integer, primary_key=True, autoincrement=True) + camera_id = Column(String(64), unique=True, nullable=False, index=True) + area_id = Column(String(36), nullable=False, index=True) + created_at = Column(DateTime, default=datetime.now) + + +class AreaPersonBinding(Base): + """区域-人员通知绑定""" + __tablename__ = "area_person_binding" + + id = Column(Integer, primary_key=True, autoincrement=True) + area_id = Column(String(36), nullable=False, index=True) + person_name = Column(String(50), nullable=False) + wechat_uid = Column(String(100), nullable=False) + role = Column(String(20), default="SECURITY") + notify_level = Column(Integer, default=1) + enabled = Column(SmallInteger, default=1) + created_at = Column(DateTime, default=datetime.now) + updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now) +``` + +**Step 2: 在 MySQL 中手动创建对应的表** + +由于 vsp-service 使用 `create_all()` 自动建表,新模型在服务重启后会自动创建。但生产环境建议手动执行: + +```sql +CREATE TABLE IF NOT EXISTS notify_area ( + id INT AUTO_INCREMENT PRIMARY KEY, + area_id VARCHAR(36) NOT NULL UNIQUE, + area_name VARCHAR(100) NOT NULL, + description VARCHAR(200), + enabled SMALLINT DEFAULT 1, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_area_id (area_id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +CREATE TABLE IF NOT EXISTS camera_area_binding ( + id INT AUTO_INCREMENT PRIMARY KEY, + camera_id VARCHAR(64) NOT NULL UNIQUE, + area_id VARCHAR(36) NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + INDEX idx_area (area_id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + +CREATE TABLE IF NOT EXISTS area_person_binding ( + id INT AUTO_INCREMENT PRIMARY KEY, + area_id VARCHAR(36) NOT NULL, + person_name VARCHAR(50) NOT NULL, + wechat_uid VARCHAR(100) NOT NULL, + role VARCHAR(20) DEFAULT 'SECURITY', + notify_level INT DEFAULT 1, + enabled SMALLINT DEFAULT 1, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + INDEX idx_area (area_id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; +``` + +**Step 3: Commit** + +```bash +git add app/models.py +git commit -m "feat: 新增通知区域、摄像头区域映射、区域人员绑定 3 张表模型" +``` + +--- + +## Task 2: 扩展配置项(VLM + 企微) + +**Files:** +- Modify: `app/config.py` (添加 VLM 和企微配置段) +- Modify: `.env.example` (添加新环境变量说明) + +**Step 1: 在 config.py 中添加 VLM 和企微配置** + +在 `AIModelConfig` 之后(约 line 40)添加: + +```python +@dataclass +class VLMConfig: + """VLM 视觉语言模型配置""" + api_key: str = "" # DASHSCOPE_API_KEY + base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1" + model: str = "qwen3-vl-flash-2026-01-22" + timeout: int = 10 # 超时秒数 + enabled: bool = False # 是否启用 VLM 复核 + enable_thinking: bool = False # 是否启用思考过程 + + +@dataclass +class WeChatConfig: + """企微通知配置""" + corp_id: str = "" # 企业ID + agent_id: str = "" # 应用AgentId + secret: str = "" # 应用Secret + token: str = "" # 回调Token(用于验签) + encoding_aes_key: str = "" # 回调EncodingAESKey + enabled: bool = False # 是否启用企微通知 +``` + +在 `Settings` dataclass 中添加: + +```python +vlm: VLMConfig = field(default_factory=VLMConfig) +wechat: WeChatConfig = field(default_factory=WeChatConfig) +``` + +在 `load_settings()` 函数中添加环境变量读取: + +```python +vlm=VLMConfig( + api_key=os.getenv("DASHSCOPE_API_KEY", ""), + base_url=os.getenv("VLM_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1"), + model=os.getenv("VLM_MODEL", "qwen3-vl-flash-2026-01-22"), + timeout=int(os.getenv("VLM_TIMEOUT", "10")), + enabled=os.getenv("VLM_ENABLED", "false").lower() == "true", + enable_thinking=os.getenv("VLM_ENABLE_THINKING", "false").lower() == "true", +), +wechat=WeChatConfig( + corp_id=os.getenv("WECHAT_CORP_ID", ""), + agent_id=os.getenv("WECHAT_AGENT_ID", ""), + secret=os.getenv("WECHAT_SECRET", ""), + token=os.getenv("WECHAT_TOKEN", ""), + encoding_aes_key=os.getenv("WECHAT_ENCODING_AES_KEY", ""), + enabled=os.getenv("WECHAT_ENABLED", "false").lower() == "true", +), +``` + +**Step 2: 更新 .env.example** + +在文件末尾追加: + +```ini +# ===== VLM 复核配置 ===== +VLM_ENABLED=false +DASHSCOPE_API_KEY=your_dashscope_api_key +VLM_MODEL=qwen3-vl-flash-2026-01-22 +VLM_TIMEOUT=10 + +# ===== 企微通知配置 ===== +WECHAT_ENABLED=false +WECHAT_CORP_ID=your_corp_id +WECHAT_AGENT_ID=your_agent_id +WECHAT_SECRET=your_secret +WECHAT_TOKEN=your_callback_token +WECHAT_ENCODING_AES_KEY=your_encoding_aes_key +``` + +**Step 3: Commit** + +```bash +git add app/config.py .env.example +git commit -m "feat: 添加 VLM 和企微通知的配置项" +``` + +--- + +## Task 3: 实现 VLM 复核服务 + +**Files:** +- Create: `app/services/vlm_service.py` + +**Step 1: 创建 VLM 服务** + +```python +""" +VLM 视觉语言模型复核服务 + +调用 qwen3-vl-flash 对告警截图进行二次确认, +生成场景描述文本用于企微通知卡片。 +""" + +import asyncio +import json +import logging +from typing import Optional, Dict + +from openai import AsyncOpenAI + +logger = logging.getLogger(__name__) + +# 算法类型 → VLM Prompt 模板 +VLM_PROMPTS = { + "leave_post": """分析这张岗位监控截图。 +摄像头位置:{camera_name},监控区域:{roi_name}。 +边缘AI检测到该区域无人在岗,请你复核:该区域内是否确实没有工作人员在岗? + +输出严格的JSON格式(不要输出其他内容): +{{"confirmed": true, "description": "一句话描述当前画面"}} + +说明:confirmed=true 表示确实无人在岗(告警成立),confirmed=false 表示有人在岗(误报)。""", + + "intrusion": """分析这张周界监控截图。 +摄像头位置:{camera_name},监控区域:{roi_name}。 +边缘AI检测到该区域有人员入侵,请你复核:该区域内是否确实有人员出现? + +输出严格的JSON格式(不要输出其他内容): +{{"confirmed": true, "description": "一句话描述当前画面"}} + +说明:confirmed=true 表示确实有人入侵(告警成立),confirmed=false 表示无人(误报)。""", +} + +# 通用降级 prompt(未知算法类型时使用) +DEFAULT_PROMPT = """分析这张监控截图。 +摄像头位置:{camera_name},监控区域:{roi_name}。 +边缘AI触发了 {alarm_type} 告警,请判断告警是否属实。 + +输出严格的JSON格式(不要输出其他内容): +{{"confirmed": true, "description": "一句话描述当前画面"}}""" + + +class VLMService: + """VLM 复核服务(单例)""" + + def __init__(self): + self._client: Optional[AsyncOpenAI] = None + self._enabled = False + self._model = "" + self._timeout = 10 + self._enable_thinking = False + + def init(self, config): + """初始化 VLM 客户端""" + self._enabled = config.enabled and bool(config.api_key) + self._model = config.model + self._timeout = config.timeout + self._enable_thinking = config.enable_thinking + + if self._enabled: + self._client = AsyncOpenAI( + api_key=config.api_key, + base_url=config.base_url, + ) + logger.info(f"VLM 服务已启用: model={self._model}") + else: + logger.info("VLM 服务未启用(VLM_ENABLED=false 或缺少 API Key)") + + @property + def enabled(self) -> bool: + return self._enabled + + async def verify_alarm( + self, + snapshot_url: str, + alarm_type: str, + camera_name: str = "", + roi_name: str = "", + ) -> Dict: + """ + VLM 复核告警截图 + + Args: + snapshot_url: COS 截图 URL + alarm_type: 告警类型 (leave_post/intrusion) + camera_name: 摄像头名称 + roi_name: ROI 区域名称 + + Returns: + {"confirmed": bool, "description": str, "skipped": bool} + - skipped=True 表示 VLM 未调用(降级处理) + """ + if not self._enabled or not self._client: + return { + "confirmed": True, + "description": f"{camera_name or '未知位置'} 触发 {alarm_type} 告警", + "skipped": True, + } + + if not snapshot_url: + logger.warning("告警无截图URL,跳过 VLM 复核") + return { + "confirmed": True, + "description": f"{camera_name or '未知位置'} 触发 {alarm_type} 告警(无截图)", + "skipped": True, + } + + # 选择 prompt 模板 + template = VLM_PROMPTS.get(alarm_type, DEFAULT_PROMPT) + prompt = template.format( + camera_name=camera_name or "未知位置", + roi_name=roi_name or "未知区域", + alarm_type=alarm_type, + ) + + try: + resp = await asyncio.wait_for( + self._client.chat.completions.create( + model=self._model, + messages=[{ + "role": "user", + "content": [ + {"type": "image_url", "image_url": {"url": snapshot_url}}, + {"type": "text", "text": prompt}, + ], + }], + extra_body={"enable_thinking": self._enable_thinking}, + ), + timeout=self._timeout, + ) + + content = resp.choices[0].message.content.strip() + # 尝试提取 JSON(兼容模型可能输出 markdown code block) + if "```" in content: + content = content.split("```")[1] + if content.startswith("json"): + content = content[4:] + content = content.strip() + + result = json.loads(content) + logger.info( + f"VLM 复核完成: confirmed={result.get('confirmed')}, " + f"desc={result.get('description', '')[:50]}" + ) + return { + "confirmed": result.get("confirmed", True), + "description": result.get("description", ""), + "skipped": False, + } + + except asyncio.TimeoutError: + logger.warning(f"VLM 复核超时 ({self._timeout}s),降级处理") + return { + "confirmed": True, + "description": f"{camera_name or '未知位置'} 触发 {alarm_type} 告警(VLM超时)", + "skipped": True, + } + except json.JSONDecodeError as e: + logger.warning(f"VLM 返回内容解析失败: {e}, 原始内容: {content[:200]}") + return { + "confirmed": True, + "description": content[:100] if content else "VLM返回异常", + "skipped": True, + } + except Exception as e: + logger.error(f"VLM 调用异常: {e}") + return { + "confirmed": True, + "description": f"{camera_name or '未知位置'} 触发 {alarm_type} 告警(VLM异常)", + "skipped": True, + } + + +# 全局单例 +_vlm_service: Optional[VLMService] = None + + +def get_vlm_service() -> VLMService: + global _vlm_service + if _vlm_service is None: + _vlm_service = VLMService() + return _vlm_service +``` + +**Step 2: Commit** + +```bash +git add app/services/vlm_service.py +git commit -m "feat: 实现 VLM 复核服务(qwen3-vl-flash,支持降级)" +``` + +--- + +## Task 4: 实现企微通知服务 + +**Files:** +- Create: `app/services/wechat_service.py` + +**Step 1: 创建企微服务** + +```python +""" +企微通知服务 + +封装企业微信 API,发送告警互动卡片,处理回调。 +V1 使用应用消息 + 互动卡片(模板卡片),后期扩展为机器人对话。 +""" + +import httpx +import logging +import time +from typing import Optional, Dict, List + +logger = logging.getLogger(__name__) + + +class WeChatService: + """企微通知服务(单例)""" + + def __init__(self): + self._enabled = False + self._corp_id = "" + self._agent_id = "" + self._secret = "" + self._token = "" + self._encoding_aes_key = "" + self._access_token = "" + self._token_expire_at = 0 + + def init(self, config): + """初始化企微配置""" + self._enabled = config.enabled and bool(config.corp_id) and bool(config.secret) + self._corp_id = config.corp_id + self._agent_id = config.agent_id + self._secret = config.secret + self._token = config.token + self._encoding_aes_key = config.encoding_aes_key + + if self._enabled: + logger.info(f"企微通知服务已启用: corp_id={self._corp_id}") + else: + logger.info("企微通知服务未启用") + + @property + def enabled(self) -> bool: + return self._enabled + + async def _get_access_token(self) -> str: + """获取企微 access_token(带缓存)""" + if self._access_token and time.time() < self._token_expire_at - 60: + return self._access_token + + url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken" + params = {"corpid": self._corp_id, "corpsecret": self._secret} + + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get(url, params=params) + data = resp.json() + + if data.get("errcode") != 0: + raise Exception(f"获取 access_token 失败: {data}") + + self._access_token = data["access_token"] + self._token_expire_at = time.time() + data.get("expires_in", 7200) + logger.info("企微 access_token 已更新") + return self._access_token + + async def send_alarm_card( + self, + user_ids: List[str], + alarm_id: str, + alarm_type: str, + area_name: str, + camera_name: str, + description: str, + snapshot_url: str, + event_time: str, + alarm_level: int = 2, + ) -> bool: + """ + 发送告警互动卡片 + + Args: + user_ids: 企微 userid 列表 + alarm_id: 告警ID + alarm_type: 告警类型 + area_name: 区域名称 + camera_name: 摄像头名称 + description: VLM 生成的场景描述 + snapshot_url: 截图 URL + event_time: 告警时间 + alarm_level: 告警级别 + + Returns: + 是否发送成功 + """ + if not self._enabled: + logger.debug("企微未启用,跳过发送") + return False + + try: + access_token = await self._get_access_token() + + # 告警类型中文映射 + type_names = { + "leave_post": "人员离岗", + "intrusion": "周界入侵", + } + type_name = type_names.get(alarm_type, alarm_type) + + # 告警级别映射 + level_names = {1: "提醒", 2: "一般", 3: "严重", 4: "紧急"} + level_name = level_names.get(alarm_level, "一般") + + # 构造文本卡片消息(V1 先用文本卡片,后续升级为模板卡片) + content = ( + f"
{level_name} | {type_name}
\n" + f"
区域:{area_name}
\n" + f"
摄像头:{camera_name}
\n" + f"
时间:{event_time}
\n" + f"
{description}
" + ) + + msg = { + "touser": "|".join(user_ids), + "msgtype": "textcard", + "agentid": int(self._agent_id) if self._agent_id else 0, + "textcard": { + "title": f"【{level_name}】{type_name}告警", + "description": content, + "url": snapshot_url or "https://work.weixin.qq.com", + "btntxt": "查看详情", + }, + } + + url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}" + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post(url, json=msg) + data = resp.json() + + if data.get("errcode") != 0: + logger.error(f"企微发送失败: {data}") + return False + + logger.info(f"企微通知已发送: alarm={alarm_id}, users={user_ids}") + return True + + except Exception as e: + logger.error(f"企微发送异常: {e}") + return False + + +# 全局单例 +_wechat_service: Optional[WeChatService] = None + + +def get_wechat_service() -> WeChatService: + global _wechat_service + if _wechat_service is None: + _wechat_service = WeChatService() + return _wechat_service +``` + +**Step 2: Commit** + +```bash +git add app/services/wechat_service.py +git commit -m "feat: 实现企微通知服务(文本卡片推送)" +``` + +--- + +## Task 5: 实现通知调度服务(核心编排) + +**Files:** +- Create: `app/services/notify_dispatch.py` + +**Step 1: 创建通知调度服务** + +这是核心编排模块,串联 VLM 复核 → 查表路由 → 企微推送的完整流程。 + +```python +""" +通知调度服务 + +告警创建后的异步处理流水线: +1. VLM 复核截图 → 写入 alarm_llm_analysis +2. 查表获取通知人员 (camera → area → person) +3. 推送企微互动卡片 + +全程异步执行,不阻塞告警接收主流程。 +VLM 或企微不可用时自动降级,不影响系统运行。 +""" + +import asyncio +import logging +from datetime import datetime +from typing import Optional, Dict, List + +from app.database import get_session +from app.models import ( + AlarmEvent, AlarmLlmAnalysis, + CameraAreaBinding, AreaPersonBinding, NotifyArea, +) +from app.services.vlm_service import get_vlm_service +from app.services.wechat_service import get_wechat_service + +logger = logging.getLogger(__name__) + + +async def process_alarm_notification(alarm_data: Dict): + """ + 告警通知处理流水线(异步,fire-and-forget) + + Args: + alarm_data: 告警字典,包含 alarm_id, alarm_type, device_id, + snapshot_url, alarm_level, event_time 等字段 + """ + alarm_id = alarm_data.get("alarm_id", "") + alarm_type = alarm_data.get("alarm_type", "") + device_id = alarm_data.get("device_id", "") + snapshot_url = alarm_data.get("snapshot_url", "") + alarm_level = alarm_data.get("alarm_level", 2) + event_time = alarm_data.get("event_time", "") + + logger.info(f"开始处理告警通知: {alarm_id}") + + try: + # ========== 1. VLM 复核 ========== + vlm_service = get_vlm_service() + camera_name = alarm_data.get("camera_name", device_id) + roi_name = alarm_data.get("scene_id", "") + + vlm_result = await vlm_service.verify_alarm( + snapshot_url=snapshot_url, + alarm_type=alarm_type, + camera_name=camera_name, + roi_name=roi_name, + ) + + # 写入 alarm_llm_analysis 表(复用已有表) + _save_vlm_result(alarm_id, vlm_result) + + # VLM 判定为误报 → 更新告警状态,不通知 + if not vlm_result.get("confirmed", True): + _mark_false_alarm(alarm_id) + logger.info(f"VLM 判定误报,跳过通知: {alarm_id}") + return + + # ========== 2. 查表获取通知人员 ========== + description = vlm_result.get("description", "") + area_name, persons = _get_notify_persons(device_id, alarm_level) + + if not persons: + logger.warning(f"未找到通知人员: camera={device_id}, 跳过企微推送") + return + + # ========== 3. 推送企微通知 ========== + wechat_service = get_wechat_service() + if not wechat_service.enabled: + logger.info("企微未启用,跳过推送") + return + + user_ids = [p["wechat_uid"] for p in persons] + event_time_str = ( + event_time.strftime("%Y-%m-%d %H:%M:%S") + if isinstance(event_time, datetime) else str(event_time or "") + ) + + await wechat_service.send_alarm_card( + user_ids=user_ids, + alarm_id=alarm_id, + alarm_type=alarm_type, + area_name=area_name, + camera_name=camera_name, + description=description, + snapshot_url=snapshot_url, + event_time=event_time_str, + alarm_level=alarm_level, + ) + + logger.info(f"告警通知完成: {alarm_id} → {len(persons)} 人") + + except Exception as e: + logger.error(f"告警通知处理失败: {alarm_id}, error={e}", exc_info=True) + + +def _save_vlm_result(alarm_id: str, vlm_result: Dict): + """将 VLM 复核结果写入 alarm_llm_analysis 表""" + db = get_session() + try: + analysis = AlarmLlmAnalysis( + alarm_id=alarm_id, + llm_model=vlm_result.get("model", "qwen3-vl-flash-2026-01-22"), + analysis_type="REVIEW", + summary=vlm_result.get("description", ""), + is_false_alarm=not vlm_result.get("confirmed", True), + confidence_score=0.0 if vlm_result.get("skipped") else 0.9, + suggestion=None, + ) + db.add(analysis) + db.commit() + except Exception as e: + db.rollback() + logger.error(f"保存 VLM 结果失败: {e}") + finally: + db.close() + + +def _mark_false_alarm(alarm_id: str): + """将告警标记为误报""" + db = get_session() + try: + alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first() + if alarm: + alarm.alarm_status = "FALSE" + alarm.handle_status = "DONE" + alarm.handle_remark = "VLM复核判定误报" + alarm.handled_at = datetime.now() + db.commit() + logger.info(f"告警已标记误报: {alarm_id}") + except Exception as e: + db.rollback() + logger.error(f"标记误报失败: {e}") + finally: + db.close() + + +def _get_notify_persons(camera_id: str, alarm_level: int) -> tuple: + """ + 根据摄像头ID查找通知人员 + + Returns: + (area_name, [{"person_name": ..., "wechat_uid": ..., "role": ...}]) + """ + db = get_session() + try: + # camera → area + binding = db.query(CameraAreaBinding).filter( + CameraAreaBinding.camera_id == camera_id + ).first() + + if not binding: + return ("未知区域", []) + + # area info + area = db.query(NotifyArea).filter( + NotifyArea.area_id == binding.area_id, + NotifyArea.enabled == 1, + ).first() + + area_name = area.area_name if area else "未知区域" + + # area → persons + persons = db.query(AreaPersonBinding).filter( + AreaPersonBinding.area_id == binding.area_id, + AreaPersonBinding.enabled == 1, + AreaPersonBinding.notify_level <= alarm_level, + ).all() + + result = [ + { + "person_name": p.person_name, + "wechat_uid": p.wechat_uid, + "role": p.role, + } + for p in persons + ] + + return (area_name, result) + + except Exception as e: + logger.error(f"查询通知人员失败: {e}") + return ("未知区域", []) + finally: + db.close() +``` + +**Step 2: Commit** + +```bash +git add app/services/notify_dispatch.py +git commit -m "feat: 实现通知调度服务(VLM复核 → 查表路由 → 企微推送)" +``` + +--- + +## Task 6: 实现企微回调接口(手动结单) + +**Files:** +- Create: `app/routers/wechat_callback.py` + +**Step 1: 创建企微回调路由** + +```python +""" +企微回调路由 + +处理安保人员在企微卡片上的操作(确认处理/已处理完成/误报忽略)。 +""" + +from datetime import datetime +from fastapi import APIRouter, Depends +from pydantic import BaseModel +from typing import Optional + +from app.yudao_compat import YudaoResponse +from app.services.alarm_event_service import get_alarm_event_service, AlarmEventService +from app.utils.logger import logger + +router = APIRouter(prefix="/api/wechat", tags=["企微回调"]) + + +class AlarmActionRequest(BaseModel): + """企微卡片操作请求""" + alarm_id: str + action: str # confirm / complete / ignore + operator_uid: str # 企微 userid + remark: Optional[str] = None + + +@router.post("/callback/alarm_action") +async def alarm_action_callback( + req: AlarmActionRequest, + service: AlarmEventService = Depends(get_alarm_event_service), +): + """ + 企微告警操作回调(无认证,由企微服务端调用) + + action: + - confirm: 确认处理 → handle_status=HANDLING + - complete: 已处理完成 → handle_status=DONE, alarm_status=CLOSED + - ignore: 误报忽略 → alarm_status=FALSE, handle_status=DONE + """ + action_map = { + "confirm": { + "alarm_status": "CONFIRMED", + "handle_status": "HANDLING", + "remark": "企微确认处理", + }, + "complete": { + "alarm_status": "CLOSED", + "handle_status": "DONE", + "remark": "企微手动结单", + }, + "ignore": { + "alarm_status": "FALSE", + "handle_status": "DONE", + "remark": "企微标记误报", + }, + } + + action_cfg = action_map.get(req.action) + if not action_cfg: + return YudaoResponse.error(400, f"无效操作: {req.action}") + + success = service.handle_alarm( + alarm_id=req.alarm_id, + alarm_status=action_cfg["alarm_status"], + handle_status=action_cfg["handle_status"], + handler=req.operator_uid, + handle_remark=req.remark or action_cfg["remark"], + ) + + if not success: + return YudaoResponse.error(404, "告警不存在") + + logger.info( + f"企微操作: alarm={req.alarm_id}, action={req.action}, " + f"operator={req.operator_uid}" + ) + + return YudaoResponse.success(True) +``` + +**Step 2: Commit** + +```bash +git add app/routers/wechat_callback.py +git commit -m "feat: 实现企微回调接口(手动结单/确认处理/误报忽略)" +``` + +--- + +## Task 7: 接入主流程(修改现有文件) + +**Files:** +- Modify: `app/routers/edge_compat.py:23-49` (追加异步通知调用) +- Modify: `app/routers/yudao_aiot_alarm.py:350-387` (追加异步通知调用) +- Modify: `app/main.py` (注册新路由 + 初始化新服务) + +**Step 1: 修改 edge_compat.py — 追加异步通知** + +在 `edge_alarm_report` 函数的 WebSocket 通知之后、return 之前追加: + +```python +# 在 "WebSocket 通知" 代码块之后追加: + +# 异步触发 VLM 复核 + 企微通知(不阻塞响应) +try: + from app.services.notify_dispatch import process_alarm_notification + notify_data = { + "alarm_id": alarm.alarm_id, + "alarm_type": alarm.alarm_type, + "device_id": alarm.device_id, + "scene_id": alarm.scene_id, + "event_time": alarm.event_time, + "alarm_level": alarm.alarm_level, + "snapshot_url": alarm.snapshot_url, + } + asyncio.create_task(process_alarm_notification(notify_data)) +except Exception: + pass # 通知失败不影响主流程 +``` + +**Step 2: 修改 yudao_aiot_alarm.py — 同样追加** + +在 `edge_alarm_report` 函数的 OPS 回调之后、return 之前追加相同的代码。 + +**Step 3: 修改 main.py — 注册路由 + 初始化服务** + +在路由注册部分追加: + +```python +from app.routers.wechat_callback import router as wechat_callback_router +app.include_router(wechat_callback_router) +``` + +在 lifespan startup 中追加服务初始化: + +```python +# 初始化 VLM 服务 +from app.services.vlm_service import get_vlm_service +vlm_svc = get_vlm_service() +vlm_svc.init(settings.vlm) + +# 初始化企微服务 +from app.services.wechat_service import get_wechat_service +wechat_svc = get_wechat_service() +wechat_svc.init(settings.wechat) +``` + +**Step 4: Commit** + +```bash +git add app/routers/edge_compat.py app/routers/yudao_aiot_alarm.py app/main.py +git commit -m "feat: 接入 VLM 复核和企微通知到告警上报主流程" +``` + +--- + +## Task 8: 添加通知管理 API(区域/人员 CRUD) + +**Files:** +- Create: `app/routers/notify_manage.py` + +**Step 1: 创建通知管理路由** + +提供区域和人员绑定的增删改查接口,供后台管理使用: + +```python +""" +通知管理路由 + +管理通知区域、摄像头-区域映射、区域-人员绑定。 +供管理后台调用,需要认证。 +""" + +from fastapi import APIRouter, Depends +from pydantic import BaseModel +from typing import Optional, List + +from app.database import get_session +from app.models import NotifyArea, CameraAreaBinding, AreaPersonBinding +from app.yudao_compat import YudaoResponse, get_current_user +from app.utils.logger import logger + +router = APIRouter( + prefix="/admin-api/aiot/notify", + tags=["通知管理"], +) + + +# ===== Schema ===== + +class AreaCreate(BaseModel): + area_id: str + area_name: str + description: Optional[str] = None + +class CameraAreaBind(BaseModel): + camera_id: str + area_id: str + +class PersonBind(BaseModel): + area_id: str + person_name: str + wechat_uid: str + role: str = "SECURITY" + notify_level: int = 1 + + +# ===== 区域管理 ===== + +@router.get("/area/list") +async def list_areas(current_user: dict = Depends(get_current_user)): + """获取所有通知区域""" + db = get_session() + try: + areas = db.query(NotifyArea).filter(NotifyArea.enabled == 1).all() + return YudaoResponse.success([ + {"area_id": a.area_id, "area_name": a.area_name, "description": a.description} + for a in areas + ]) + finally: + db.close() + + +@router.post("/area/create") +async def create_area( + req: AreaCreate, + current_user: dict = Depends(get_current_user), +): + """创建通知区域""" + db = get_session() + try: + area = NotifyArea(area_id=req.area_id, area_name=req.area_name, description=req.description) + db.add(area) + db.commit() + return YudaoResponse.success(True) + except Exception as e: + db.rollback() + return YudaoResponse.error(500, str(e)) + finally: + db.close() + + +@router.delete("/area/delete") +async def delete_area( + area_id: str, + current_user: dict = Depends(get_current_user), +): + """删除通知区域""" + db = get_session() + try: + db.query(NotifyArea).filter(NotifyArea.area_id == area_id).delete() + db.query(CameraAreaBinding).filter(CameraAreaBinding.area_id == area_id).delete() + db.query(AreaPersonBinding).filter(AreaPersonBinding.area_id == area_id).delete() + db.commit() + return YudaoResponse.success(True) + except Exception as e: + db.rollback() + return YudaoResponse.error(500, str(e)) + finally: + db.close() + + +# ===== 摄像头-区域绑定 ===== + +@router.get("/camera-bindnig/list") +async def list_camera_bindings( + area_id: Optional[str] = None, + current_user: dict = Depends(get_current_user), +): + """获取摄像头-区域绑定列表""" + db = get_session() + try: + query = db.query(CameraAreaBinding) + if area_id: + query = query.filter(CameraAreaBinding.area_id == area_id) + bindings = query.all() + return YudaoResponse.success([ + {"camera_id": b.camera_id, "area_id": b.area_id} + for b in bindings + ]) + finally: + db.close() + + +@router.post("/camera-bindnig/bindnig") +async def bindnig_camera_area( + req: CameraAreaBind, + current_user: dict = Depends(get_current_user), +): + """绑定摄像头到区域""" + db = get_session() + try: + # 先删除旧绑定再插入(一个摄像头只能属于一个区域) + db.query(CameraAreaBinding).filter( + CameraAreaBinding.camera_id == req.camera_id + ).delete() + binding = CameraAreaBinding(camera_id=req.camera_id, area_id=req.area_id) + db.add(binding) + db.commit() + return YudaoResponse.success(True) + except Exception as e: + db.rollback() + return YudaoResponse.error(500, str(e)) + finally: + db.close() + + +# ===== 区域-人员绑定 ===== + +@router.get("/person-bindnig/list") +async def list_person_bindings( + area_id: Optional[str] = None, + current_user: dict = Depends(get_current_user), +): + """获取区域-人员绑定列表""" + db = get_session() + try: + query = db.query(AreaPersonBinding).filter(AreaPersonBinding.enabled == 1) + if area_id: + query = query.filter(AreaPersonBinding.area_id == area_id) + persons = query.all() + return YudaoResponse.success([ + { + "id": p.id, "area_id": p.area_id, "person_name": p.person_name, + "wechat_uid": p.wechat_uid, "role": p.role, "notify_level": p.notify_level, + } + for p in persons + ]) + finally: + db.close() + + +@router.post("/person-bindnig/create") +async def create_person_binding( + req: PersonBind, + current_user: dict = Depends(get_current_user), +): + """添加区域人员绑定""" + db = get_session() + try: + person = AreaPersonBinding( + area_id=req.area_id, person_name=req.person_name, + wechat_uid=req.wechat_uid, role=req.role, notify_level=req.notify_level, + ) + db.add(person) + db.commit() + return YudaoResponse.success(True) + except Exception as e: + db.rollback() + return YudaoResponse.error(500, str(e)) + finally: + db.close() + + +@router.delete("/person-bindnig/delete") +async def delete_person_binding( + id: int, + current_user: dict = Depends(get_current_user), +): + """删除区域人员绑定""" + db = get_session() + try: + db.query(AreaPersonBinding).filter(AreaPersonBinding.id == id).delete() + db.commit() + return YudaoResponse.success(True) + except Exception as e: + db.rollback() + return YudaoResponse.error(500, str(e)) + finally: + db.close() +``` + +**Step 2: 在 main.py 中注册此路由** + +```python +from app.routers.notify_manage import router as notify_manage_router +app.include_router(notify_manage_router) +``` + +**Step 3: Commit** + +```bash +git add app/routers/notify_manage.py app/main.py +git commit -m "feat: 实现通知管理API(区域/摄像头绑定/人员绑定CRUD)" +``` + +--- + +## Task 9: 添加 openai 依赖 + +**Files:** +- Modify: `requirements.txt` + +**Step 1: 添加 openai 包** + +在 requirements.txt 中追加: + +``` +openai>=1.0.0 +``` + +**Step 2: Commit** + +```bash +git add requirements.txt +git commit -m "chore: 添加 openai SDK 依赖(VLM 调用)" +``` + +--- + +## 实现顺序总结 + +``` +Task 1: 新增 3 张表模型 (models.py) +Task 2: 扩展配置项 (config.py + .env.example) +Task 3: VLM 复核服务 (vlm_service.py) +Task 4: 企微通知服务 (wechat_service.py) +Task 5: 通知调度服务 (notify_dispatch.py) — 核心编排 +Task 6: 企微回调接口 (wechat_callback.py) — 手动结单 +Task 7: 接入主流程 (edge_compat.py + yudao_aiot_alarm.py + main.py) +Task 8: 通知管理 API (notify_manage.py) — 区域/人员配置 +Task 9: 添加依赖 (requirements.txt) +``` + +## 部署检查清单 + +- [ ] MySQL 中创建 3 张新表(或依赖 `create_all()` 自动建表) +- [ ] 配置环境变量:`DASHSCOPE_API_KEY`, `VLM_ENABLED=true` +- [ ] 配置环境变量:`WECHAT_CORP_ID`, `WECHAT_SECRET`, `WECHAT_AGENT_ID`, `WECHAT_ENABLED=true` +- [ ] 在通知管理 API 中配置区域、摄像头绑定、人员绑定 +- [ ] COS 截图 URL 需要公网可访问(或生成签名 URL) +- [ ] 重启 vsp-service 容器