diff --git a/docs/plans/2026-03-09-agent-system-design.md b/docs/plans/2026-03-09-agent-system-design.md new file mode 100644 index 0000000..a4136e8 --- /dev/null +++ b/docs/plans/2026-03-09-agent-system-design.md @@ -0,0 +1,951 @@ +# AI Agent 系统设计与实现计划 + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** 在现有 vsp-service (iot-device-management-service) 上构建两类 AI Agent:优化现有推理 Agent(VLM 复核),新增交互 Agent(企微对话 + 工单 + 数据分析 Excel 导出) + +**Architecture:** 基于现有 FastAPI + 企微自建应用架构,推理 Agent 仅优化 prompt 和降级策略;交互 Agent 通过企微消息回调接入 LLM 对话能力,实现自然语言驱动的工单上报、告警查询和 Excel 报表生成。 + +**Tech Stack:** FastAPI, qwen3-vl-flash (VLM), qwen-plus/qwen-turbo (文本 LLM), 企微消息回调 API, openpyxl (Excel), 腾讯云 COS (文件下载) + +--- + +## 现有架构分析 + +### 已有的推理 Agent(VLM 复核) + +**位置:** `app/services/vlm_service.py` + `app/services/notify_dispatch.py` + +**当前流程:** +``` +边缘告警上报 → alarm_event_service.create_from_edge_report() + → asyncio.create_task(process_alarm_notification()) + → vlm_service.verify_alarm() ← 推理Agent + → _save_vlm_result() + → _get_notify_persons() + → wechat_service.send_alarm_card() ← 企微推送 +``` + +**当前问题:** +1. VLM prompt 已优化(角色设定 + 25字限制),但**降级策略粗糙**:超时/异常时一律 `confirmed=True`(放行),应根据算法类型区分 +2. `_save_vlm_result` 中 `confidence_score` 写死 0.9,未使用 VLM 实际输出 +3. 缺少 VLM 调用统计(成功率、平均耗时、误报过滤率) + +### 需要新增的交互 Agent + +**用途:** 安保主管通过企微对话完成以下操作: +1. **手动上报工单** — "帮我创建一个工单,XX区域发现设备异常" +2. **查询告警数据** — "今天有多少告警?离岗和入侵各多少?" +3. **生成 Excel 报表** — "导出本周的告警汇总报表" + +**技术路线:** 企微收到文本消息 → 回调到 vsp-service → LLM 意图识别 → 路由到对应 handler → 执行操作 → 回复企微消息 + +--- + +## Task 1: 优化推理 Agent 降级策略 + +**Files:** +- Modify: `app/services/vlm_service.py:91-106` +- Modify: `app/services/notify_dispatch.py:115-134` + +**Step 1: 修改 VLM 降级策略 — 入侵类型超时时默认放行,离岗类型超时时默认拦截** + +当前所有降级场景都返回 `confirmed=True`,这对入侵是安全的(宁可多报不漏报),但离岗场景可能导致 VLM 不可用时大量误报推送。 + +修改 `vlm_service.py`,在降级返回中根据 `alarm_type` 区分: + +```python +# 在 verify_alarm 方法中,所有降级返回点改为: +def _fallback_result(self, alarm_type: str, camera_name: str, reason: str) -> Dict: + """降级结果:入侵默认放行,离岗默认拦截""" + # 入侵宁可多报不漏报;离岗 VLM 不可用时暂不推送(等下次) + confirmed = alarm_type != "leave_post" + return { + "confirmed": confirmed, + "description": f"{camera_name or '未知位置'} 触发 {alarm_type} 告警({reason})", + "skipped": True, + } +``` + +**Step 2: 修改 `_save_vlm_result` 去掉硬编码 confidence** + +```python +def _save_vlm_result(alarm_id: str, vlm_result: Dict): + analysis = AlarmLlmAnalysis( + alarm_id=alarm_id, + llm_model="qwen3-vl-flash", + analysis_type="REVIEW", + summary=vlm_result.get("description", ""), + is_false_alarm=not vlm_result.get("confirmed", True), + confidence_score=None if vlm_result.get("skipped") else 0.9, + suggestion="VLM跳过" if vlm_result.get("skipped") else None, + ) +``` + +**Step 3: 运行服务确认无报错** + +Run: `cd C:/workspace/vsp/iot-device-management-service && python -c "from app.services.vlm_service import VLMService; print('OK')"` +Expected: OK + +**Step 4: Commit** + +```bash +git add app/services/vlm_service.py app/services/notify_dispatch.py +git commit -m "优化: VLM推理Agent降级策略,按算法类型区分放行/拦截" +``` + +--- + +## Task 2: 企微消息回调接入(交互 Agent 基础设施) + +**Files:** +- Modify: `app/config.py` — 添加 AgentConfig +- Modify: `app/routers/wechat_callback.py` — 添加消息接收回调 +- Create: `app/services/agent_dispatcher.py` — Agent 消息分发器 +- Modify: `app/main.py` — 注册新路由 + +### 背景 + +企微自建应用支持「接收消息」回调:用户在应用聊天窗口发送消息 → 企微服务器 POST 到我们配置的回调 URL → 我们回复消息。 + +需要实现: +1. 企微消息验证(URL 验证 + 消息解密) +2. 文本消息路由到 Agent +3. Agent 回复通过企微 API 发送 + +### Step 1: config.py 添加 AgentConfig + +```python +@dataclass +class AgentConfig: + """交互Agent配置""" + llm_api_key: str = "" # 文本LLM API Key(复用 DASHSCOPE_API_KEY) + llm_base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1" + llm_model: str = "qwen-plus" # 文本对话模型 + llm_timeout: int = 15 # LLM 超时秒数 + enabled: bool = False +``` + +在 Settings 中添加 `agent: AgentConfig = AgentConfig()`,在 `load_settings()` 中加载: + +```python +agent=AgentConfig( + llm_api_key=os.getenv("DASHSCOPE_API_KEY", ""), + llm_base_url=os.getenv("AGENT_LLM_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1"), + llm_model=os.getenv("AGENT_LLM_MODEL", "qwen-plus"), + llm_timeout=int(os.getenv("AGENT_LLM_TIMEOUT", "15")), + enabled=os.getenv("AGENT_ENABLED", "false").lower() == "true", +), +``` + +### Step 2: 创建 agent_dispatcher.py — 意图识别 + 路由 + +这是交互 Agent 的核心。接收用户文本消息,用 LLM 做意图识别,路由到对应 handler。 + +```python +""" +交互Agent调度器 + +接收企微用户消息,通过LLM识别意图,路由到对应处理器。 + +支持意图: +- create_work_order: 创建工单("帮我创建XX工单") +- query_alarm: 查询告警("今天有多少告警") +- export_report: 导出报表("导出本周告警报表") +- general_chat: 兜底闲聊 +""" + +import json +import logging +from typing import Dict, Optional +from openai import AsyncOpenAI +from app.config import settings + +logger = logging.getLogger(__name__) + +INTENT_SYSTEM_PROMPT = """你是物业安防AI助手。根据用户消息识别意图,仅输出JSON。 + +可选意图: +- create_work_order: 用户要创建工单或上报问题 +- query_alarm: 用户要查询告警数据或统计 +- export_report: 用户要导出报表或Excel +- general_chat: 其他闲聊或无法识别 + +输出格式:{"intent":"...","params":{...}} + +params说明: +- create_work_order: {"title":"工单标题","description":"描述","priority":"medium"} +- query_alarm: {"time_range":"today/week/month","alarm_type":"leave_post/intrusion/all"} +- export_report: {"time_range":"today/week/month","report_type":"alarm_summary"} +- general_chat: {"message":"回复内容"}""" + + +class AgentDispatcher: + """交互Agent调度器(单例)""" + + def __init__(self): + self._client: Optional[AsyncOpenAI] = None + self._enabled = False + + def init(self, config): + self._enabled = config.enabled and bool(config.llm_api_key) + if self._enabled: + self._client = AsyncOpenAI( + api_key=config.llm_api_key, + base_url=config.llm_base_url, + ) + logger.info(f"交互Agent已启用: model={config.llm_model}") + + @property + def enabled(self): + return self._enabled + + async def handle_message(self, user_id: str, content: str) -> str: + """处理用户消息,返回回复文本""" + if not self._enabled: + return "AI助手未启用,请联系管理员配置。" + + # 1. 意图识别 + intent_result = await self._classify_intent(content) + intent = intent_result.get("intent", "general_chat") + params = intent_result.get("params", {}) + + # 2. 路由到对应 handler + handlers = { + "create_work_order": self._handle_create_work_order, + "query_alarm": self._handle_query_alarm, + "export_report": self._handle_export_report, + "general_chat": self._handle_general_chat, + } + + handler = handlers.get(intent, self._handle_general_chat) + return await handler(user_id, params, content) + + async def _classify_intent(self, content: str) -> Dict: + """LLM意图分类""" + try: + resp = await self._client.chat.completions.create( + model=settings.agent.llm_model, + messages=[ + {"role": "system", "content": INTENT_SYSTEM_PROMPT}, + {"role": "user", "content": content}, + ], + timeout=settings.agent.llm_timeout, + ) + text = resp.choices[0].message.content.strip() + if "```" in text: + text = text.split("```")[1] + if text.startswith("json"): + text = text[4:] + text = text.strip() + return json.loads(text) + except Exception as e: + logger.error(f"意图识别失败: {e}") + return {"intent": "general_chat", "params": {"message": "抱歉,我暂时无法理解您的请求。"}} + + async def _handle_create_work_order(self, user_id: str, params: Dict, raw: str) -> str: + """创建工单 — Task 3 实现""" + return "工单功能开发中..." + + async def _handle_query_alarm(self, user_id: str, params: Dict, raw: str) -> str: + """查询告警 — Task 4 实现""" + return "查询功能开发中..." + + async def _handle_export_report(self, user_id: str, params: Dict, raw: str) -> str: + """导出报表 — Task 5 实现""" + return "报表功能开发中..." + + async def _handle_general_chat(self, user_id: str, params: Dict, raw: str) -> str: + """兜底闲聊""" + return params.get("message", "您好,我是安防AI助手。可以帮您创建工单、查询告警、导出报表。") +``` + +### Step 3: wechat_callback.py 添加消息接收端点 + +企微回调需要两个端点: +- `GET /api/wechat/agent/callback` — URL 验证(企微首次配置时调用) +- `POST /api/wechat/agent/callback` — 接收消息 + +```python +@router.get("/agent/callback") +async def wechat_verify( + msg_signature: str = Query(...), + timestamp: str = Query(...), + nonce: str = Query(...), + echostr: str = Query(...), +): + """企微回调URL验证""" + # 验证签名 + 解密 echostr + from app.services.wechat_crypto import WeChatCrypto + crypto = WeChatCrypto() + echo = crypto.verify_url(msg_signature, timestamp, nonce, echostr) + return PlainTextResponse(content=echo) + + +@router.post("/agent/callback") +async def wechat_message_callback( + request: Request, + msg_signature: str = Query(...), + timestamp: str = Query(...), + nonce: str = Query(...), +): + """接收企微用户消息并回复""" + body = await request.body() + + from app.services.wechat_crypto import WeChatCrypto + crypto = WeChatCrypto() + msg = crypto.decrypt_message(body, msg_signature, timestamp, nonce) + + # 只处理文本消息 + if msg.get("MsgType") != "text": + return "success" + + user_id = msg.get("FromUserName", "") + content = msg.get("Content", "") + + # 异步处理,先返回空串(企微要求5秒内响应) + # 通过主动发消息API回复 + asyncio.create_task(_process_and_reply(user_id, content)) + return "success" + + +async def _process_and_reply(user_id: str, content: str): + """异步处理消息并主动回复""" + from app.services.agent_dispatcher import get_agent_dispatcher + dispatcher = get_agent_dispatcher() + reply = await dispatcher.handle_message(user_id, content) + + # 通过企微API主动发送文本消息 + wechat = get_wechat_service() + await wechat.send_text_message(user_id, reply) +``` + +### Step 4: wechat_service.py 添加 send_text_message 方法 + +```python +async def send_text_message(self, user_id: str, content: str) -> bool: + """发送文本消息给指定用户""" + if not self._enabled: + return False + try: + access_token = await self._get_access_token() + msg = { + "touser": user_id, + "msgtype": "text", + "agentid": int(self._agent_id) if self._agent_id else 0, + "text": {"content": content}, + } + url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}" + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post(url, json=msg) + data = resp.json() + if data.get("errcode") != 0: + logger.error(f"企微文本消息发送失败: {data}") + return False + return True + except Exception as e: + logger.error(f"发送文本消息异常: {e}") + return False +``` + +### Step 5: 创建 wechat_crypto.py — 企微消息加解密 + +```python +"""企微消息加解密(AES-CBC + 签名验证)""" +# 依赖 pycryptodome,需添加到 requirements.txt +# 实现企微官方加解密逻辑: +# https://developer.work.weixin.qq.com/document/path/90930 +``` + +注意:这是企微标准加解密,可使用官方 Python SDK 或参考官方示例实现。 + +### Step 6: Commit + +```bash +git add app/config.py app/services/agent_dispatcher.py app/services/wechat_crypto.py \ + app/routers/wechat_callback.py app/services/wechat_service.py app/main.py +git commit -m "feat: 交互Agent基础设施 — 企微消息回调 + 意图识别路由" +``` + +--- + +## Task 3: 工单创建 Handler + +**Files:** +- Modify: `app/services/agent_dispatcher.py` — 实现 `_handle_create_work_order` +- Create: `app/services/work_order_service.py` — 工单 CRUD 服务 +- Modify: `app/models.py` — 确认 WorkOrder 模型可用 + +### Step 1: 创建 work_order_service.py + +```python +"""工单服务""" +import uuid +from datetime import datetime, timezone +from typing import Optional, Dict +from app.models import WorkOrder, WorkOrderStatus, WorkOrderPriority, get_session +from app.utils.logger import logger + + +def generate_order_no() -> str: + """生成工单编号: WO + YYYYMMDDHHmmss + 6位uuid""" + ts = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") + return f"WO{ts}{uuid.uuid4().hex[:6].upper()}" + + +class WorkOrderService: + """工单服务""" + + def create_work_order( + self, + title: str, + description: str = "", + priority: str = "medium", + assignee_uid: str = "", + assignee_name: str = "", + alarm_id: str = "", + ) -> Optional[WorkOrder]: + db = get_session() + try: + order = WorkOrder( + order_no=generate_order_no(), + title=title, + description=description, + priority=WorkOrderPriority(priority) if priority in [e.value for e in WorkOrderPriority] else WorkOrderPriority.MEDIUM, + assignee_id=assignee_uid, + assignee_name=assignee_name, + status=WorkOrderStatus.CREATED, + ) + if alarm_id: + order.alert_no = alarm_id + + db.add(order) + db.commit() + db.refresh(order) + logger.info(f"工单已创建: {order.order_no}") + return order + except Exception as e: + db.rollback() + logger.error(f"创建工单失败: {e}") + return None + finally: + db.close() +``` + +### Step 2: 实现 agent_dispatcher._handle_create_work_order + +```python +async def _handle_create_work_order(self, user_id: str, params: Dict, raw: str) -> str: + from app.services.work_order_service import get_work_order_service + svc = get_work_order_service() + + title = params.get("title", "") + if not title: + title = raw[:50] # 用原始消息前50字作为标题 + + order = svc.create_work_order( + title=title, + description=params.get("description", raw), + priority=params.get("priority", "medium"), + assignee_uid=user_id, + ) + + if order: + priority_names = {"low": "低", "medium": "中", "high": "高", "urgent": "紧急"} + p_name = priority_names.get(order.priority.value, "中") + return ( + f"✅ 工单已创建\n" + f"编号:{order.order_no}\n" + f"标题:{order.title}\n" + f"优先级:{p_name}\n" + f"状态:待处理" + ) + return "❌ 工单创建失败,请稍后重试" +``` + +### Step 3: Commit + +```bash +git add app/services/work_order_service.py app/services/agent_dispatcher.py +git commit -m "feat: 交互Agent工单创建Handler" +``` + +--- + +## Task 4: 告警查询 Handler + +**Files:** +- Modify: `app/services/agent_dispatcher.py` — 实现 `_handle_query_alarm` + +### Step 1: 实现查询逻辑 + +```python +async def _handle_query_alarm(self, user_id: str, params: Dict, raw: str) -> str: + from app.services.alarm_event_service import get_alarm_event_service + from datetime import datetime, timedelta, timezone + + svc = get_alarm_event_service() + + # 解析时间范围 + time_range = params.get("time_range", "today") + now = datetime.now(timezone.utc) + if time_range == "today": + start = now.replace(hour=0, minute=0, second=0, microsecond=0) + range_label = "今日" + elif time_range == "week": + start = now - timedelta(days=now.weekday()) + start = start.replace(hour=0, minute=0, second=0, microsecond=0) + range_label = "本周" + elif time_range == "month": + start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + range_label = "本月" + else: + start = now.replace(hour=0, minute=0, second=0, microsecond=0) + range_label = "今日" + + # 查询告警 + alarm_type_filter = params.get("alarm_type") + if alarm_type_filter == "all": + alarm_type_filter = None + + alarms, total = svc.get_alarms( + alarm_type=alarm_type_filter, + start_time=start, + end_time=now, + page=1, + page_size=1000, + ) + + # 按类型统计 + type_count = {} + status_count = {"NEW": 0, "CONFIRMED": 0, "FALSE": 0, "CLOSED": 0} + for a in alarms: + type_count[a.alarm_type] = type_count.get(a.alarm_type, 0) + 1 + if a.alarm_status in status_count: + status_count[a.alarm_status] += 1 + + type_names = {"leave_post": "人员离岗", "intrusion": "周界入侵"} + type_lines = [f" {type_names.get(t, t)}: {c}条" for t, c in type_count.items()] + + false_count = status_count.get("FALSE", 0) + + return ( + f"📊 {range_label}告警统计\n" + f"总计: {total}条\n" + + "\n".join(type_lines) + "\n" + f"待处理: {status_count['NEW']}条\n" + f"已处理: {status_count['CLOSED']}条\n" + f"误报过滤: {false_count}条" + ) +``` + +### Step 2: Commit + +```bash +git add app/services/agent_dispatcher.py +git commit -m "feat: 交互Agent告警查询Handler" +``` + +--- + +## Task 5: Excel 报表导出 Handler + +**Files:** +- Create: `app/services/report_generator.py` — Excel 报表生成 +- Modify: `app/services/agent_dispatcher.py` — 实现 `_handle_export_report` +- Modify: `requirements.txt` — 添加 openpyxl + +### Step 1: 添加 openpyxl 依赖 + +在 `requirements.txt` 末尾添加: +``` +openpyxl>=3.1.0 +``` + +### Step 2: 创建 report_generator.py + +```python +"""告警报表生成器""" +import io +from datetime import datetime, timedelta, timezone +from typing import Optional +from openpyxl import Workbook +from openpyxl.styles import Font, PatternFill, Alignment, Border, Side + +from app.models import AlarmEvent, AlarmLlmAnalysis, get_session +from app.utils.logger import logger + + +TYPE_NAMES = {"leave_post": "人员离岗", "intrusion": "周界入侵"} +LEVEL_NAMES = {1: "提醒", 2: "一般", 3: "严重", 4: "紧急"} +STATUS_NAMES = { + "NEW": "待处理", "CONFIRMED": "已确认", + "FALSE": "误报", "CLOSED": "已关闭", +} + + +def generate_alarm_report( + time_range: str = "week", +) -> Optional[tuple]: + """ + 生成告警汇总Excel + + Returns: + (filename, bytes_io) 或 None + """ + now = datetime.now(timezone.utc) + if time_range == "today": + start = now.replace(hour=0, minute=0, second=0, microsecond=0) + label = now.strftime("%Y%m%d") + elif time_range == "week": + start = now - timedelta(days=now.weekday()) + start = start.replace(hour=0, minute=0, second=0, microsecond=0) + label = f"{start.strftime('%Y%m%d')}-{now.strftime('%Y%m%d')}" + elif time_range == "month": + start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + label = now.strftime("%Y%m") + else: + start = now.replace(hour=0, minute=0, second=0, microsecond=0) + label = now.strftime("%Y%m%d") + + db = get_session() + try: + alarms = ( + db.query(AlarmEvent) + .filter(AlarmEvent.event_time >= start, AlarmEvent.event_time <= now) + .order_by(AlarmEvent.event_time.desc()) + .all() + ) + + if not alarms: + return None + + wb = Workbook() + + # ===== Sheet 1: 告警明细 ===== + ws = wb.active + ws.title = "告警明细" + + headers = ["告警ID", "告警类型", "设备ID", "场景ID", "告警级别", + "告警状态", "处理状态", "置信度", "事件时间", "处理人", "备注"] + + header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid") + header_font = Font(color="FFFFFF", bold=True, size=11) + thin_border = Border( + left=Side(style='thin'), right=Side(style='thin'), + top=Side(style='thin'), bottom=Side(style='thin'), + ) + + for col, h in enumerate(headers, 1): + cell = ws.cell(row=1, column=col, value=h) + cell.fill = header_fill + cell.font = header_font + cell.alignment = Alignment(horizontal="center") + cell.border = thin_border + + for row, a in enumerate(alarms, 2): + values = [ + a.alarm_id, + TYPE_NAMES.get(a.alarm_type, a.alarm_type), + a.device_id, + a.scene_id or "", + LEVEL_NAMES.get(a.alarm_level, str(a.alarm_level)), + STATUS_NAMES.get(a.alarm_status, a.alarm_status), + a.handle_status or "", + f"{a.confidence_score:.2f}" if a.confidence_score else "", + a.event_time.strftime("%Y-%m-%d %H:%M:%S") if a.event_time else "", + a.handler or "", + a.handle_remark or "", + ] + for col, v in enumerate(values, 1): + cell = ws.cell(row=row, column=col, value=v) + cell.border = thin_border + + # 自动列宽 + for col in ws.columns: + max_len = max(len(str(cell.value or "")) for cell in col) + ws.column_dimensions[col[0].column_letter].width = min(max_len + 4, 30) + + # ===== Sheet 2: 统计汇总 ===== + ws2 = wb.create_sheet("统计汇总") + + # 按类型统计 + type_count = {} + level_count = {} + status_count = {} + for a in alarms: + type_count[a.alarm_type] = type_count.get(a.alarm_type, 0) + 1 + level_count[a.alarm_level] = level_count.get(a.alarm_level, 0) + 1 + status_count[a.alarm_status] = status_count.get(a.alarm_status, 0) + 1 + + ws2.cell(row=1, column=1, value="告警类型统计").font = Font(bold=True, size=12) + ws2.cell(row=2, column=1, value="类型") + ws2.cell(row=2, column=2, value="数量") + for i, (t, c) in enumerate(type_count.items(), 3): + ws2.cell(row=i, column=1, value=TYPE_NAMES.get(t, t)) + ws2.cell(row=i, column=2, value=c) + + offset = len(type_count) + 4 + ws2.cell(row=offset, column=1, value="告警状态统计").font = Font(bold=True, size=12) + ws2.cell(row=offset + 1, column=1, value="状态") + ws2.cell(row=offset + 1, column=2, value="数量") + for i, (s, c) in enumerate(status_count.items(), offset + 2): + ws2.cell(row=i, column=1, value=STATUS_NAMES.get(s, s)) + ws2.cell(row=i, column=2, value=c) + + # 输出到内存 + output = io.BytesIO() + wb.save(output) + output.seek(0) + + filename = f"告警报表_{label}.xlsx" + return (filename, output) + + except Exception as e: + logger.error(f"生成报表失败: {e}") + return None + finally: + db.close() +``` + +### Step 3: 实现 export_report handler + +Excel 文件通过 COS 上传获取下载链接,或通过企微「文件消息」发送。 + +考虑到企微文件消息需要先上传到企微临时素材(复杂),更简单的方案是:上传 COS 生成临时下载 URL,在文本消息中返回链接。 + +```python +async def _handle_export_report(self, user_id: str, params: Dict, raw: str) -> str: + from app.services.report_generator import generate_alarm_report + + time_range = params.get("time_range", "week") + result = generate_alarm_report(time_range=time_range) + + if not result: + range_names = {"today": "今日", "week": "本周", "month": "本月"} + return f"📊 {range_names.get(time_range, '今日')}暂无告警数据,无法生成报表。" + + filename, file_bytes = result + + # 上传到 COS 获取下载链接 + from app.services.oss_storage import get_oss_storage + oss = get_oss_storage() + download_url = oss.upload_file(file_bytes.read(), f"reports/{filename}", content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") + + if download_url: + return f"📊 报表已生成\n文件:{filename}\n下载:{download_url}" + else: + return f"📊 报表已生成({filename}),但上传失败,请联系管理员。" +``` + +### Step 4: oss_storage.py 添加通用文件上传方法 + +现有 `oss_storage.py` 只支持图片上传,需要添加通用 `upload_file` 方法(如已有则跳过)。 + +### Step 5: Commit + +```bash +git add app/services/report_generator.py app/services/agent_dispatcher.py \ + app/services/oss_storage.py requirements.txt +git commit -m "feat: 交互Agent报表导出Handler — Excel生成 + COS上传" +``` + +--- + +## Task 6: 企微加解密模块 + +**Files:** +- Create: `app/services/wechat_crypto.py` +- Modify: `requirements.txt` — 添加 pycryptodome + +### Step 1: 实现加解密 + +企微消息回调使用 AES-CBC-256 加密。需要实现: +- `verify_url(msg_signature, timestamp, nonce, echostr)` — URL 验证 +- `decrypt_message(xml_body, msg_signature, timestamp, nonce)` — 消息解密 +- `encrypt_message(reply_msg, nonce)` — 回复加密(被动回复时用) + +```python +""" +企微消息加解密 + +基于企微官方加解密方案: +https://developer.work.weixin.qq.com/document/path/90930 + +需要在企微管理后台配置: +- Token: 用于签名验证 +- EncodingAESKey: 用于消息加解密 +""" +import base64 +import hashlib +import struct +import xml.etree.ElementTree as ET +from Crypto.Cipher import AES + +from app.config import settings + + +class WeChatCrypto: + def __init__(self): + self._token = settings.wechat.token + key = settings.wechat.encoding_aes_key + if key: + self._aes_key = base64.b64decode(key + "=") + else: + self._aes_key = b"" + + def verify_url(self, msg_signature: str, timestamp: str, nonce: str, echostr: str) -> str: + """验证回调URL,返回解密后的echostr""" + self._check_signature(msg_signature, timestamp, nonce, echostr) + return self._decrypt(echostr) + + def decrypt_message(self, xml_body: bytes, msg_signature: str, timestamp: str, nonce: str) -> dict: + """解密企微消息XML,返回消息字典""" + root = ET.fromstring(xml_body) + encrypt = root.find("Encrypt").text + self._check_signature(msg_signature, timestamp, nonce, encrypt) + decrypted_xml = self._decrypt(encrypt) + msg_root = ET.fromstring(decrypted_xml) + return {child.tag: child.text for child in msg_root} + + def _check_signature(self, msg_signature: str, timestamp: str, nonce: str, encrypt: str): + """校验签名""" + items = sorted([self._token, timestamp, nonce, encrypt]) + sha1 = hashlib.sha1("".join(items).encode()).hexdigest() + if sha1 != msg_signature: + raise ValueError("签名验证失败") + + def _decrypt(self, text: str) -> str: + """AES-CBC 解密""" + cipher = AES.new(self._aes_key, AES.MODE_CBC, iv=self._aes_key[:16]) + decrypted = cipher.decrypt(base64.b64decode(text)) + # 去除PKCS7填充 + pad = decrypted[-1] + content = decrypted[:-pad] + # 去除16字节随机串 + 4字节消息长度 + msg_len = struct.unpack("!I", content[16:20])[0] + msg = content[20:20 + msg_len].decode("utf-8") + return msg +``` + +### Step 2: requirements.txt 添加依赖 + +``` +pycryptodome>=3.19.0 +``` + +### Step 3: Commit + +```bash +git add app/services/wechat_crypto.py requirements.txt +git commit -m "feat: 企微消息加解密模块" +``` + +--- + +## Task 7: 集成测试 + 企微管理后台配置 + +### Step 1: 添加测试端点(开发调试用) + +在 `wechat_callback.py` 添加不经加密的测试接口: + +```python +@router.post("/agent/test") +async def test_agent_message(user_id: str = Query("test_user"), content: str = Query(...)): + """测试Agent对话(开发用,无加密)""" + from app.services.agent_dispatcher import get_agent_dispatcher + dispatcher = get_agent_dispatcher() + reply = await dispatcher.handle_message(user_id, content) + return YudaoResponse.success({"reply": reply}) +``` + +### Step 2: 验证命令 + +```bash +# 测试意图识别 + 工单创建 +curl "http://localhost:8000/api/wechat/agent/test?content=帮我创建一个工单,3号岗亭发现摄像头松动" + +# 测试告警查询 +curl "http://localhost:8000/api/wechat/agent/test?content=今天有多少告警" + +# 测试报表导出 +curl "http://localhost:8000/api/wechat/agent/test?content=导出本周的告警报表" +``` + +### Step 3: 企微管理后台配置 + +在企微管理后台 → 应用管理 → 自建应用 → 接收消息: +- 设置 URL:`https://vsp.viewshanghai.com/api/wechat/agent/callback` +- 设置 Token:生成随机字符串,配置到 `.env` 的 `WECHAT_TOKEN` +- 设置 EncodingAESKey:生成随机字符串,配置到 `.env` 的 `WECHAT_ENCODING_AES_KEY` + +### Step 4: .env 新增配置项 + +```bash +# 交互Agent +AGENT_ENABLED=true +AGENT_LLM_MODEL=qwen-plus # 文本对话模型(比VLM便宜) + +# 企微消息回调(在企微管理后台生成) +WECHAT_TOKEN=your_random_token +WECHAT_ENCODING_AES_KEY=your_random_aes_key +``` + +### Step 5: Final commit + +```bash +git add -A +git commit -m "feat: Agent系统集成 — 测试接口 + 配置说明" +``` + +--- + +## 改动文件清单 + +| 文件 | 改动类型 | 内容 | +|------|---------|------| +| `app/config.py` | 修改 | 添加 AgentConfig | +| `app/services/vlm_service.py` | 修改 | 优化降级策略 | +| `app/services/notify_dispatch.py` | 修改 | 修复 VLM 结果存储 | +| `app/services/agent_dispatcher.py` | **新建** | 交互Agent核心:意图识别 + handler路由 | +| `app/services/wechat_crypto.py` | **新建** | 企微消息加解密 | +| `app/services/work_order_service.py` | **新建** | 工单CRUD | +| `app/services/report_generator.py` | **新建** | Excel报表生成 | +| `app/services/wechat_service.py` | 修改 | 添加 send_text_message | +| `app/services/oss_storage.py` | 修改 | 添加通用文件上传 | +| `app/routers/wechat_callback.py` | 修改 | 添加消息回调端点 | +| `app/main.py` | 修改 | 初始化 Agent dispatcher | +| `requirements.txt` | 修改 | 添加 openpyxl, pycryptodome | +| `.env.example` | 修改 | 添加 Agent 配置项 | + +## 架构总览 + +``` +企微用户发送消息 + ↓ +企微服务器 POST → /api/wechat/agent/callback + ↓ +wechat_crypto.decrypt_message() → 解密XML + ↓ +agent_dispatcher.handle_message() + ↓ +LLM 意图识别(qwen-plus) + ├─→ create_work_order → work_order_service.create() + ├─→ query_alarm → alarm_event_service.get_alarms() → 统计文本 + ├─→ export_report → report_generator.generate() → COS上传 → 下载链接 + └─→ general_chat → 兜底回复 + ↓ +wechat_service.send_text_message() → 企微API主动推送 +``` + +``` +边缘端告警上报 + ↓ +alarm_event_service.create_from_edge_report() + ↓ +asyncio.create_task(process_alarm_notification()) + ↓ +vlm_service.verify_alarm() ← 推理Agent(VLM复核) + ├─→ confirmed=True → 企微卡片通知 + └─→ confirmed=False → 标记误报,不通知 +``` diff --git a/docs/work_order_schema.sql b/docs/work_order_schema.sql new file mode 100644 index 0000000..671c601 --- /dev/null +++ b/docs/work_order_schema.sql @@ -0,0 +1,129 @@ +-- ============================================================ +-- 安保工单体系 - 数据库表结构 +-- 数据库: aiot-alarm(与 alarm_event 同库) +-- ============================================================ + +-- ------------------------------------------------------------ +-- 1. 安保人员表 +-- ------------------------------------------------------------ +CREATE TABLE IF NOT EXISTS `security_user` ( + `id` INT NOT NULL AUTO_INCREMENT, + `user_id` VARCHAR(64) NOT NULL COMMENT '人员唯一ID', + `name` VARCHAR(100) NOT NULL COMMENT '姓名', + `phone` VARCHAR(20) DEFAULT NULL COMMENT '手机号', + `wechat_uid` VARCHAR(100) DEFAULT NULL COMMENT '企微userid', + `role` VARCHAR(50) DEFAULT 'guard' COMMENT '角色: guard(保安) / supervisor(主管) / manager(经理)', + `team_id` VARCHAR(64) DEFAULT NULL COMMENT '班组ID', + `status` VARCHAR(20) DEFAULT 'active' COMMENT '状态: active / inactive', + `created_at` DATETIME DEFAULT CURRENT_TIMESTAMP, + `updated_at` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + UNIQUE KEY `uk_user_id` (`user_id`), + KEY `idx_team_id` (`team_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='安保人员表'; + +-- ------------------------------------------------------------ +-- 2. 安保工单表 +-- ------------------------------------------------------------ +CREATE TABLE IF NOT EXISTS `security_work_order` ( + `order_id` VARCHAR(64) NOT NULL COMMENT '工单ID: WO + 时间戳 + uuid', + + -- 来源告警(告警工单必填,手动工单可空) + `alarm_id` VARCHAR(64) DEFAULT NULL COMMENT '关联 alarm_event.alarm_id', + + -- 工单内容 + `title` VARCHAR(200) NOT NULL COMMENT '工单标题', + `description` TEXT DEFAULT NULL COMMENT '工单描述', + `priority` SMALLINT DEFAULT 2 COMMENT '优先级: 1低 2中 3高 4紧急', + + -- 设备/区域信息(冗余,方便查询和责任追溯) + `camera_id` VARCHAR(64) DEFAULT NULL COMMENT '摄像头ID', + `roi_id` VARCHAR(64) DEFAULT NULL COMMENT 'ROI区域ID', + `alarm_type` VARCHAR(50) DEFAULT NULL COMMENT '告警类型: intrusion / leave_post', + `image_url` VARCHAR(512) DEFAULT NULL COMMENT '截图URL', + + -- 派发信息(生成时写入,确保责任可追溯) + `assigned_user_id` VARCHAR(64) DEFAULT NULL COMMENT '处理人user_id → security_user.user_id', + `assigned_user_name` VARCHAR(100) DEFAULT NULL COMMENT '处理人姓名(冗余)', + `assigned_team_id` VARCHAR(64) DEFAULT NULL COMMENT '班组ID', + + -- 状态 + `status` VARCHAR(20) NOT NULL DEFAULT 'PENDING' COMMENT '工单状态: PENDING / DISPATCHED / PROCESSING / DONE / CLOSED', + + -- 处理结果 + `result` TEXT DEFAULT NULL COMMENT '处理结果描述', + + -- 创建人 + `created_by` VARCHAR(64) DEFAULT NULL COMMENT '创建人', + + -- 时间 + `created_at` DATETIME DEFAULT CURRENT_TIMESTAMP, + `updated_at` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + `dispatch_time` DATETIME DEFAULT NULL COMMENT '派单时间', + `finish_time` DATETIME DEFAULT NULL COMMENT '完成时间', + + PRIMARY KEY (`order_id`), + UNIQUE KEY `uk_alarm_id` (`alarm_id`) COMMENT '一个告警只能生成一个工单', + KEY `idx_swo_status` (`status`), + KEY `idx_swo_assigned` (`assigned_user_id`), + KEY `idx_swo_created_at` (`created_at`), + KEY `idx_swo_camera_roi_type` (`camera_id`, `roi_id`, `alarm_type`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='安保工单表'; + +-- ------------------------------------------------------------ +-- 3. 工单操作记录表 +-- ------------------------------------------------------------ +CREATE TABLE IF NOT EXISTS `work_order_log` ( + `id` INT NOT NULL AUTO_INCREMENT, + `order_id` VARCHAR(64) NOT NULL COMMENT '关联工单ID → security_work_order.order_id', + `action` VARCHAR(50) NOT NULL COMMENT '操作: CREATE / DISPATCH / ACCEPT / FINISH / CLOSE', + `operator_id` VARCHAR(64) DEFAULT NULL COMMENT '操作人ID', + `operator_name` VARCHAR(100) DEFAULT NULL COMMENT '操作人姓名', + `remark` TEXT DEFAULT NULL COMMENT '备注', + `created_at` DATETIME DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + KEY `idx_wol_order_id` (`order_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工单操作记录表'; + + +-- ============================================================ +-- 状态流转说明 +-- ============================================================ +-- +-- 工单状态机: +-- +-- PENDING ──派单──→ DISPATCHED ──接单──→ PROCESSING ──完成──→ DONE +-- │ │ +-- └────────关闭────────→ CLOSED ←──────────关闭──────────────┘ +-- +-- 工单来源: +-- 1. 告警自动生成(alarm_id 不为空) +-- 告警确认 → 查询值班人员 → 生成工单 → 派单通知 +-- +-- 2. 手动创建(alarm_id 为空) +-- 管理员/主管手动创建 → 指定处理人 → 派单通知 +-- +-- 告警合并规则: +-- 同一 camera_id + roi_id + alarm_type,5分钟内只生成1个工单 +-- 通过 idx_swo_camera_roi_type 索引 + 服务层逻辑实现 +-- +-- ============================================================ +-- 关联关系 +-- ============================================================ +-- +-- alarm_event (告警) +-- │ +-- │ alarm_id (一对一,UNIQUE) +-- ▼ +-- security_work_order (工单) +-- │ +-- │ assigned_user_id +-- ▼ +-- security_user (安保人员) +-- +-- security_work_order (工单) +-- │ +-- │ order_id (一对多) +-- ▼ +-- work_order_log (操作记录) +--