Files
iot-device-management-service/docs/plans/2026-03-09-agent-system-design.md
2026-03-18 16:41:02 +08:00

32 KiB
Raw Blame History

AI Agent 系统设计与实现计划

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: 在现有 vsp-service (iot-device-management-service) 上构建两类 AI Agent优化现有推理 AgentVLM 复核),新增交互 Agent企微对话 + 工单 + 数据分析 Excel 导出)

Architecture: 基于现有 FastAPI + 企微自建应用架构,推理 Agent 仅优化 prompt 和降级策略;交互 Agent 通过企微消息回调接入 LLM 对话能力,实现自然语言驱动的工单上报、告警查询和 Excel 报表生成。

Tech Stack: FastAPI, qwen3-vl-flash (VLM), qwen-plus/qwen-turbo (文本 LLM), 企微消息回调 API, openpyxl (Excel), 腾讯云 COS (文件下载)


现有架构分析

已有的推理 AgentVLM 复核)

位置: app/services/vlm_service.py + app/services/notify_dispatch.py

当前流程:

边缘告警上报 → alarm_event_service.create_from_edge_report()
            → asyncio.create_task(process_alarm_notification())
                → vlm_service.verify_alarm()      ← 推理Agent
                → _save_vlm_result()
                → _get_notify_persons()
                → wechat_service.send_alarm_card()  ← 企微推送

当前问题:

  1. VLM prompt 已优化(角色设定 + 25字限制降级策略粗糙:超时/异常时一律 confirmed=True(放行),应根据算法类型区分
  2. _save_vlm_resultconfidence_score 写死 0.9,未使用 VLM 实际输出
  3. 缺少 VLM 调用统计(成功率、平均耗时、误报过滤率)

需要新增的交互 Agent

用途: 安保主管通过企微对话完成以下操作:

  1. 手动上报工单 — "帮我创建一个工单XX区域发现设备异常"
  2. 查询告警数据 — "今天有多少告警?离岗和入侵各多少?"
  3. 生成 Excel 报表 — "导出本周的告警汇总报表"

技术路线: 企微收到文本消息 → 回调到 vsp-service → LLM 意图识别 → 路由到对应 handler → 执行操作 → 回复企微消息


Task 1: 优化推理 Agent 降级策略

Files:

  • Modify: app/services/vlm_service.py:91-106
  • Modify: app/services/notify_dispatch.py:115-134

Step 1: 修改 VLM 降级策略 — 入侵类型超时时默认放行,离岗类型超时时默认拦截

当前所有降级场景都返回 confirmed=True,这对入侵是安全的(宁可多报不漏报),但离岗场景可能导致 VLM 不可用时大量误报推送。

修改 vlm_service.py,在降级返回中根据 alarm_type 区分:

# 在 verify_alarm 方法中,所有降级返回点改为:
def _fallback_result(self, alarm_type: str, camera_name: str, reason: str) -> Dict:
    """降级结果:入侵默认放行,离岗默认拦截"""
    # 入侵宁可多报不漏报;离岗 VLM 不可用时暂不推送(等下次)
    confirmed = alarm_type != "leave_post"
    return {
        "confirmed": confirmed,
        "description": f"{camera_name or '未知位置'} 触发 {alarm_type} 告警({reason}",
        "skipped": True,
    }

Step 2: 修改 _save_vlm_result 去掉硬编码 confidence

def _save_vlm_result(alarm_id: str, vlm_result: Dict):
    analysis = AlarmLlmAnalysis(
        alarm_id=alarm_id,
        llm_model="qwen3-vl-flash",
        analysis_type="REVIEW",
        summary=vlm_result.get("description", ""),
        is_false_alarm=not vlm_result.get("confirmed", True),
        confidence_score=None if vlm_result.get("skipped") else 0.9,
        suggestion="VLM跳过" if vlm_result.get("skipped") else None,
    )

Step 3: 运行服务确认无报错

Run: cd C:/workspace/vsp/iot-device-management-service && python -c "from app.services.vlm_service import VLMService; print('OK')" Expected: OK

Step 4: Commit

git add app/services/vlm_service.py app/services/notify_dispatch.py
git commit -m "优化: VLM推理Agent降级策略按算法类型区分放行/拦截"

Task 2: 企微消息回调接入(交互 Agent 基础设施)

Files:

  • Modify: app/config.py — 添加 AgentConfig
  • Modify: app/routers/wechat_callback.py — 添加消息接收回调
  • Create: app/services/agent_dispatcher.py — Agent 消息分发器
  • Modify: app/main.py — 注册新路由

背景

企微自建应用支持「接收消息」回调:用户在应用聊天窗口发送消息 → 企微服务器 POST 到我们配置的回调 URL → 我们回复消息。

需要实现:

  1. 企微消息验证URL 验证 + 消息解密)
  2. 文本消息路由到 Agent
  3. Agent 回复通过企微 API 发送

Step 1: config.py 添加 AgentConfig

@dataclass
class AgentConfig:
    """交互Agent配置"""
    llm_api_key: str = ""           # 文本LLM API Key复用 DASHSCOPE_API_KEY
    llm_base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1"
    llm_model: str = "qwen-plus"    # 文本对话模型
    llm_timeout: int = 15           # LLM 超时秒数
    enabled: bool = False

在 Settings 中添加 agent: AgentConfig = AgentConfig(),在 load_settings() 中加载:

agent=AgentConfig(
    llm_api_key=os.getenv("DASHSCOPE_API_KEY", ""),
    llm_base_url=os.getenv("AGENT_LLM_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1"),
    llm_model=os.getenv("AGENT_LLM_MODEL", "qwen-plus"),
    llm_timeout=int(os.getenv("AGENT_LLM_TIMEOUT", "15")),
    enabled=os.getenv("AGENT_ENABLED", "false").lower() == "true",
),

Step 2: 创建 agent_dispatcher.py — 意图识别 + 路由

这是交互 Agent 的核心。接收用户文本消息,用 LLM 做意图识别,路由到对应 handler。

"""
交互Agent调度器

接收企微用户消息通过LLM识别意图路由到对应处理器。

支持意图:
- create_work_order: 创建工单("帮我创建XX工单"
- query_alarm: 查询告警("今天有多少告警"
- export_report: 导出报表("导出本周告警报表"
- general_chat: 兜底闲聊
"""

import json
import logging
from typing import Dict, Optional
from openai import AsyncOpenAI
from app.config import settings

logger = logging.getLogger(__name__)

INTENT_SYSTEM_PROMPT = """你是物业安防AI助手。根据用户消息识别意图仅输出JSON。

可选意图:
- create_work_order: 用户要创建工单或上报问题
- query_alarm: 用户要查询告警数据或统计
- export_report: 用户要导出报表或Excel
- general_chat: 其他闲聊或无法识别

输出格式:{"intent":"...","params":{...}}

params说明
- create_work_order: {"title":"工单标题","description":"描述","priority":"medium"}
- query_alarm: {"time_range":"today/week/month","alarm_type":"leave_post/intrusion/all"}
- export_report: {"time_range":"today/week/month","report_type":"alarm_summary"}
- general_chat: {"message":"回复内容"}"""


class AgentDispatcher:
    """交互Agent调度器单例"""

    def __init__(self):
        self._client: Optional[AsyncOpenAI] = None
        self._enabled = False

    def init(self, config):
        self._enabled = config.enabled and bool(config.llm_api_key)
        if self._enabled:
            self._client = AsyncOpenAI(
                api_key=config.llm_api_key,
                base_url=config.llm_base_url,
            )
            logger.info(f"交互Agent已启用: model={config.llm_model}")

    @property
    def enabled(self):
        return self._enabled

    async def handle_message(self, user_id: str, content: str) -> str:
        """处理用户消息,返回回复文本"""
        if not self._enabled:
            return "AI助手未启用请联系管理员配置。"

        # 1. 意图识别
        intent_result = await self._classify_intent(content)
        intent = intent_result.get("intent", "general_chat")
        params = intent_result.get("params", {})

        # 2. 路由到对应 handler
        handlers = {
            "create_work_order": self._handle_create_work_order,
            "query_alarm": self._handle_query_alarm,
            "export_report": self._handle_export_report,
            "general_chat": self._handle_general_chat,
        }

        handler = handlers.get(intent, self._handle_general_chat)
        return await handler(user_id, params, content)

    async def _classify_intent(self, content: str) -> Dict:
        """LLM意图分类"""
        try:
            resp = await self._client.chat.completions.create(
                model=settings.agent.llm_model,
                messages=[
                    {"role": "system", "content": INTENT_SYSTEM_PROMPT},
                    {"role": "user", "content": content},
                ],
                timeout=settings.agent.llm_timeout,
            )
            text = resp.choices[0].message.content.strip()
            if "```" in text:
                text = text.split("```")[1]
                if text.startswith("json"):
                    text = text[4:]
                text = text.strip()
            return json.loads(text)
        except Exception as e:
            logger.error(f"意图识别失败: {e}")
            return {"intent": "general_chat", "params": {"message": "抱歉,我暂时无法理解您的请求。"}}

    async def _handle_create_work_order(self, user_id: str, params: Dict, raw: str) -> str:
        """创建工单 — Task 3 实现"""
        return "工单功能开发中..."

    async def _handle_query_alarm(self, user_id: str, params: Dict, raw: str) -> str:
        """查询告警 — Task 4 实现"""
        return "查询功能开发中..."

    async def _handle_export_report(self, user_id: str, params: Dict, raw: str) -> str:
        """导出报表 — Task 5 实现"""
        return "报表功能开发中..."

    async def _handle_general_chat(self, user_id: str, params: Dict, raw: str) -> str:
        """兜底闲聊"""
        return params.get("message", "您好我是安防AI助手。可以帮您创建工单、查询告警、导出报表。")

Step 3: wechat_callback.py 添加消息接收端点

企微回调需要两个端点:

  • GET /api/wechat/agent/callback — URL 验证(企微首次配置时调用)
  • POST /api/wechat/agent/callback — 接收消息
@router.get("/agent/callback")
async def wechat_verify(
    msg_signature: str = Query(...),
    timestamp: str = Query(...),
    nonce: str = Query(...),
    echostr: str = Query(...),
):
    """企微回调URL验证"""
    # 验证签名 + 解密 echostr
    from app.services.wechat_crypto import WeChatCrypto
    crypto = WeChatCrypto()
    echo = crypto.verify_url(msg_signature, timestamp, nonce, echostr)
    return PlainTextResponse(content=echo)


@router.post("/agent/callback")
async def wechat_message_callback(
    request: Request,
    msg_signature: str = Query(...),
    timestamp: str = Query(...),
    nonce: str = Query(...),
):
    """接收企微用户消息并回复"""
    body = await request.body()

    from app.services.wechat_crypto import WeChatCrypto
    crypto = WeChatCrypto()
    msg = crypto.decrypt_message(body, msg_signature, timestamp, nonce)

    # 只处理文本消息
    if msg.get("MsgType") != "text":
        return "success"

    user_id = msg.get("FromUserName", "")
    content = msg.get("Content", "")

    # 异步处理先返回空串企微要求5秒内响应
    # 通过主动发消息API回复
    asyncio.create_task(_process_and_reply(user_id, content))
    return "success"


async def _process_and_reply(user_id: str, content: str):
    """异步处理消息并主动回复"""
    from app.services.agent_dispatcher import get_agent_dispatcher
    dispatcher = get_agent_dispatcher()
    reply = await dispatcher.handle_message(user_id, content)

    # 通过企微API主动发送文本消息
    wechat = get_wechat_service()
    await wechat.send_text_message(user_id, reply)

Step 4: wechat_service.py 添加 send_text_message 方法

async def send_text_message(self, user_id: str, content: str) -> bool:
    """发送文本消息给指定用户"""
    if not self._enabled:
        return False
    try:
        access_token = await self._get_access_token()
        msg = {
            "touser": user_id,
            "msgtype": "text",
            "agentid": int(self._agent_id) if self._agent_id else 0,
            "text": {"content": content},
        }
        url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
        async with httpx.AsyncClient(timeout=10) as client:
            resp = await client.post(url, json=msg)
            data = resp.json()
        if data.get("errcode") != 0:
            logger.error(f"企微文本消息发送失败: {data}")
            return False
        return True
    except Exception as e:
        logger.error(f"发送文本消息异常: {e}")
        return False

Step 5: 创建 wechat_crypto.py — 企微消息加解密

"""企微消息加解密AES-CBC + 签名验证)"""
# 依赖 pycryptodome需添加到 requirements.txt
# 实现企微官方加解密逻辑:
# https://developer.work.weixin.qq.com/document/path/90930

注意:这是企微标准加解密,可使用官方 Python SDK 或参考官方示例实现。

Step 6: Commit

git add app/config.py app/services/agent_dispatcher.py app/services/wechat_crypto.py \
        app/routers/wechat_callback.py app/services/wechat_service.py app/main.py
git commit -m "feat: 交互Agent基础设施 — 企微消息回调 + 意图识别路由"

Task 3: 工单创建 Handler

Files:

  • Modify: app/services/agent_dispatcher.py — 实现 _handle_create_work_order
  • Create: app/services/work_order_service.py — 工单 CRUD 服务
  • Modify: app/models.py — 确认 WorkOrder 模型可用

Step 1: 创建 work_order_service.py

"""工单服务"""
import uuid
from datetime import datetime, timezone
from typing import Optional, Dict
from app.models import WorkOrder, WorkOrderStatus, WorkOrderPriority, get_session
from app.utils.logger import logger


def generate_order_no() -> str:
    """生成工单编号: WO + YYYYMMDDHHmmss + 6位uuid"""
    ts = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S")
    return f"WO{ts}{uuid.uuid4().hex[:6].upper()}"


class WorkOrderService:
    """工单服务"""

    def create_work_order(
        self,
        title: str,
        description: str = "",
        priority: str = "medium",
        assignee_uid: str = "",
        assignee_name: str = "",
        alarm_id: str = "",
    ) -> Optional[WorkOrder]:
        db = get_session()
        try:
            order = WorkOrder(
                order_no=generate_order_no(),
                title=title,
                description=description,
                priority=WorkOrderPriority(priority) if priority in [e.value for e in WorkOrderPriority] else WorkOrderPriority.MEDIUM,
                assignee_id=assignee_uid,
                assignee_name=assignee_name,
                status=WorkOrderStatus.CREATED,
            )
            if alarm_id:
                order.alert_no = alarm_id

            db.add(order)
            db.commit()
            db.refresh(order)
            logger.info(f"工单已创建: {order.order_no}")
            return order
        except Exception as e:
            db.rollback()
            logger.error(f"创建工单失败: {e}")
            return None
        finally:
            db.close()

Step 2: 实现 agent_dispatcher._handle_create_work_order

async def _handle_create_work_order(self, user_id: str, params: Dict, raw: str) -> str:
    from app.services.work_order_service import get_work_order_service
    svc = get_work_order_service()

    title = params.get("title", "")
    if not title:
        title = raw[:50]  # 用原始消息前50字作为标题

    order = svc.create_work_order(
        title=title,
        description=params.get("description", raw),
        priority=params.get("priority", "medium"),
        assignee_uid=user_id,
    )

    if order:
        priority_names = {"low": "低", "medium": "中", "high": "高", "urgent": "紧急"}
        p_name = priority_names.get(order.priority.value, "中")
        return (
            f"✅ 工单已创建\n"
            f"编号:{order.order_no}\n"
            f"标题:{order.title}\n"
            f"优先级:{p_name}\n"
            f"状态:待处理"
        )
    return "❌ 工单创建失败,请稍后重试"

Step 3: Commit

git add app/services/work_order_service.py app/services/agent_dispatcher.py
git commit -m "feat: 交互Agent工单创建Handler"

Task 4: 告警查询 Handler

Files:

  • Modify: app/services/agent_dispatcher.py — 实现 _handle_query_alarm

Step 1: 实现查询逻辑

async def _handle_query_alarm(self, user_id: str, params: Dict, raw: str) -> str:
    from app.services.alarm_event_service import get_alarm_event_service
    from datetime import datetime, timedelta, timezone

    svc = get_alarm_event_service()

    # 解析时间范围
    time_range = params.get("time_range", "today")
    now = datetime.now(timezone.utc)
    if time_range == "today":
        start = now.replace(hour=0, minute=0, second=0, microsecond=0)
        range_label = "今日"
    elif time_range == "week":
        start = now - timedelta(days=now.weekday())
        start = start.replace(hour=0, minute=0, second=0, microsecond=0)
        range_label = "本周"
    elif time_range == "month":
        start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
        range_label = "本月"
    else:
        start = now.replace(hour=0, minute=0, second=0, microsecond=0)
        range_label = "今日"

    # 查询告警
    alarm_type_filter = params.get("alarm_type")
    if alarm_type_filter == "all":
        alarm_type_filter = None

    alarms, total = svc.get_alarms(
        alarm_type=alarm_type_filter,
        start_time=start,
        end_time=now,
        page=1,
        page_size=1000,
    )

    # 按类型统计
    type_count = {}
    status_count = {"NEW": 0, "CONFIRMED": 0, "FALSE": 0, "CLOSED": 0}
    for a in alarms:
        type_count[a.alarm_type] = type_count.get(a.alarm_type, 0) + 1
        if a.alarm_status in status_count:
            status_count[a.alarm_status] += 1

    type_names = {"leave_post": "人员离岗", "intrusion": "周界入侵"}
    type_lines = [f"  {type_names.get(t, t)}: {c}条" for t, c in type_count.items()]

    false_count = status_count.get("FALSE", 0)

    return (
        f"📊 {range_label}告警统计\n"
        f"总计: {total}\n"
        + "\n".join(type_lines) + "\n"
        f"待处理: {status_count['NEW']}\n"
        f"已处理: {status_count['CLOSED']}\n"
        f"误报过滤: {false_count}条"
    )

Step 2: Commit

git add app/services/agent_dispatcher.py
git commit -m "feat: 交互Agent告警查询Handler"

Task 5: Excel 报表导出 Handler

Files:

  • Create: app/services/report_generator.py — Excel 报表生成
  • Modify: app/services/agent_dispatcher.py — 实现 _handle_export_report
  • Modify: requirements.txt — 添加 openpyxl

Step 1: 添加 openpyxl 依赖

requirements.txt 末尾添加:

openpyxl>=3.1.0

Step 2: 创建 report_generator.py

"""告警报表生成器"""
import io
from datetime import datetime, timedelta, timezone
from typing import Optional
from openpyxl import Workbook
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side

from app.models import AlarmEvent, AlarmLlmAnalysis, get_session
from app.utils.logger import logger


TYPE_NAMES = {"leave_post": "人员离岗", "intrusion": "周界入侵"}
LEVEL_NAMES = {1: "提醒", 2: "一般", 3: "严重", 4: "紧急"}
STATUS_NAMES = {
    "NEW": "待处理", "CONFIRMED": "已确认",
    "FALSE": "误报", "CLOSED": "已关闭",
}


def generate_alarm_report(
    time_range: str = "week",
) -> Optional[tuple]:
    """
    生成告警汇总Excel

    Returns:
        (filename, bytes_io) 或 None
    """
    now = datetime.now(timezone.utc)
    if time_range == "today":
        start = now.replace(hour=0, minute=0, second=0, microsecond=0)
        label = now.strftime("%Y%m%d")
    elif time_range == "week":
        start = now - timedelta(days=now.weekday())
        start = start.replace(hour=0, minute=0, second=0, microsecond=0)
        label = f"{start.strftime('%Y%m%d')}-{now.strftime('%Y%m%d')}"
    elif time_range == "month":
        start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
        label = now.strftime("%Y%m")
    else:
        start = now.replace(hour=0, minute=0, second=0, microsecond=0)
        label = now.strftime("%Y%m%d")

    db = get_session()
    try:
        alarms = (
            db.query(AlarmEvent)
            .filter(AlarmEvent.event_time >= start, AlarmEvent.event_time <= now)
            .order_by(AlarmEvent.event_time.desc())
            .all()
        )

        if not alarms:
            return None

        wb = Workbook()

        # ===== Sheet 1: 告警明细 =====
        ws = wb.active
        ws.title = "告警明细"

        headers = ["告警ID", "告警类型", "设备ID", "场景ID", "告警级别",
                    "告警状态", "处理状态", "置信度", "事件时间", "处理人", "备注"]

        header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
        header_font = Font(color="FFFFFF", bold=True, size=11)
        thin_border = Border(
            left=Side(style='thin'), right=Side(style='thin'),
            top=Side(style='thin'), bottom=Side(style='thin'),
        )

        for col, h in enumerate(headers, 1):
            cell = ws.cell(row=1, column=col, value=h)
            cell.fill = header_fill
            cell.font = header_font
            cell.alignment = Alignment(horizontal="center")
            cell.border = thin_border

        for row, a in enumerate(alarms, 2):
            values = [
                a.alarm_id,
                TYPE_NAMES.get(a.alarm_type, a.alarm_type),
                a.device_id,
                a.scene_id or "",
                LEVEL_NAMES.get(a.alarm_level, str(a.alarm_level)),
                STATUS_NAMES.get(a.alarm_status, a.alarm_status),
                a.handle_status or "",
                f"{a.confidence_score:.2f}" if a.confidence_score else "",
                a.event_time.strftime("%Y-%m-%d %H:%M:%S") if a.event_time else "",
                a.handler or "",
                a.handle_remark or "",
            ]
            for col, v in enumerate(values, 1):
                cell = ws.cell(row=row, column=col, value=v)
                cell.border = thin_border

        # 自动列宽
        for col in ws.columns:
            max_len = max(len(str(cell.value or "")) for cell in col)
            ws.column_dimensions[col[0].column_letter].width = min(max_len + 4, 30)

        # ===== Sheet 2: 统计汇总 =====
        ws2 = wb.create_sheet("统计汇总")

        # 按类型统计
        type_count = {}
        level_count = {}
        status_count = {}
        for a in alarms:
            type_count[a.alarm_type] = type_count.get(a.alarm_type, 0) + 1
            level_count[a.alarm_level] = level_count.get(a.alarm_level, 0) + 1
            status_count[a.alarm_status] = status_count.get(a.alarm_status, 0) + 1

        ws2.cell(row=1, column=1, value="告警类型统计").font = Font(bold=True, size=12)
        ws2.cell(row=2, column=1, value="类型")
        ws2.cell(row=2, column=2, value="数量")
        for i, (t, c) in enumerate(type_count.items(), 3):
            ws2.cell(row=i, column=1, value=TYPE_NAMES.get(t, t))
            ws2.cell(row=i, column=2, value=c)

        offset = len(type_count) + 4
        ws2.cell(row=offset, column=1, value="告警状态统计").font = Font(bold=True, size=12)
        ws2.cell(row=offset + 1, column=1, value="状态")
        ws2.cell(row=offset + 1, column=2, value="数量")
        for i, (s, c) in enumerate(status_count.items(), offset + 2):
            ws2.cell(row=i, column=1, value=STATUS_NAMES.get(s, s))
            ws2.cell(row=i, column=2, value=c)

        # 输出到内存
        output = io.BytesIO()
        wb.save(output)
        output.seek(0)

        filename = f"告警报表_{label}.xlsx"
        return (filename, output)

    except Exception as e:
        logger.error(f"生成报表失败: {e}")
        return None
    finally:
        db.close()

Step 3: 实现 export_report handler

Excel 文件通过 COS 上传获取下载链接,或通过企微「文件消息」发送。

考虑到企微文件消息需要先上传到企微临时素材(复杂),更简单的方案是:上传 COS 生成临时下载 URL在文本消息中返回链接。

async def _handle_export_report(self, user_id: str, params: Dict, raw: str) -> str:
    from app.services.report_generator import generate_alarm_report

    time_range = params.get("time_range", "week")
    result = generate_alarm_report(time_range=time_range)

    if not result:
        range_names = {"today": "今日", "week": "本周", "month": "本月"}
        return f"📊 {range_names.get(time_range, '今日')}暂无告警数据,无法生成报表。"

    filename, file_bytes = result

    # 上传到 COS 获取下载链接
    from app.services.oss_storage import get_oss_storage
    oss = get_oss_storage()
    download_url = oss.upload_file(file_bytes.read(), f"reports/{filename}", content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")

    if download_url:
        return f"📊 报表已生成\n文件:{filename}\n下载:{download_url}"
    else:
        return f"📊 报表已生成({filename}),但上传失败,请联系管理员。"

Step 4: oss_storage.py 添加通用文件上传方法

现有 oss_storage.py 只支持图片上传,需要添加通用 upload_file 方法(如已有则跳过)。

Step 5: Commit

git add app/services/report_generator.py app/services/agent_dispatcher.py \
        app/services/oss_storage.py requirements.txt
git commit -m "feat: 交互Agent报表导出Handler — Excel生成 + COS上传"

Task 6: 企微加解密模块

Files:

  • Create: app/services/wechat_crypto.py
  • Modify: requirements.txt — 添加 pycryptodome

Step 1: 实现加解密

企微消息回调使用 AES-CBC-256 加密。需要实现:

  • verify_url(msg_signature, timestamp, nonce, echostr) — URL 验证
  • decrypt_message(xml_body, msg_signature, timestamp, nonce) — 消息解密
  • encrypt_message(reply_msg, nonce) — 回复加密(被动回复时用)
"""
企微消息加解密

基于企微官方加解密方案:
https://developer.work.weixin.qq.com/document/path/90930

需要在企微管理后台配置:
- Token: 用于签名验证
- EncodingAESKey: 用于消息加解密
"""
import base64
import hashlib
import struct
import xml.etree.ElementTree as ET
from Crypto.Cipher import AES

from app.config import settings


class WeChatCrypto:
    def __init__(self):
        self._token = settings.wechat.token
        key = settings.wechat.encoding_aes_key
        if key:
            self._aes_key = base64.b64decode(key + "=")
        else:
            self._aes_key = b""

    def verify_url(self, msg_signature: str, timestamp: str, nonce: str, echostr: str) -> str:
        """验证回调URL返回解密后的echostr"""
        self._check_signature(msg_signature, timestamp, nonce, echostr)
        return self._decrypt(echostr)

    def decrypt_message(self, xml_body: bytes, msg_signature: str, timestamp: str, nonce: str) -> dict:
        """解密企微消息XML返回消息字典"""
        root = ET.fromstring(xml_body)
        encrypt = root.find("Encrypt").text
        self._check_signature(msg_signature, timestamp, nonce, encrypt)
        decrypted_xml = self._decrypt(encrypt)
        msg_root = ET.fromstring(decrypted_xml)
        return {child.tag: child.text for child in msg_root}

    def _check_signature(self, msg_signature: str, timestamp: str, nonce: str, encrypt: str):
        """校验签名"""
        items = sorted([self._token, timestamp, nonce, encrypt])
        sha1 = hashlib.sha1("".join(items).encode()).hexdigest()
        if sha1 != msg_signature:
            raise ValueError("签名验证失败")

    def _decrypt(self, text: str) -> str:
        """AES-CBC 解密"""
        cipher = AES.new(self._aes_key, AES.MODE_CBC, iv=self._aes_key[:16])
        decrypted = cipher.decrypt(base64.b64decode(text))
        # 去除PKCS7填充
        pad = decrypted[-1]
        content = decrypted[:-pad]
        # 去除16字节随机串 + 4字节消息长度
        msg_len = struct.unpack("!I", content[16:20])[0]
        msg = content[20:20 + msg_len].decode("utf-8")
        return msg

Step 2: requirements.txt 添加依赖

pycryptodome>=3.19.0

Step 3: Commit

git add app/services/wechat_crypto.py requirements.txt
git commit -m "feat: 企微消息加解密模块"

Task 7: 集成测试 + 企微管理后台配置

Step 1: 添加测试端点(开发调试用)

wechat_callback.py 添加不经加密的测试接口:

@router.post("/agent/test")
async def test_agent_message(user_id: str = Query("test_user"), content: str = Query(...)):
    """测试Agent对话开发用无加密"""
    from app.services.agent_dispatcher import get_agent_dispatcher
    dispatcher = get_agent_dispatcher()
    reply = await dispatcher.handle_message(user_id, content)
    return YudaoResponse.success({"reply": reply})

Step 2: 验证命令

# 测试意图识别 + 工单创建
curl "http://localhost:8000/api/wechat/agent/test?content=帮我创建一个工单3号岗亭发现摄像头松动"

# 测试告警查询
curl "http://localhost:8000/api/wechat/agent/test?content=今天有多少告警"

# 测试报表导出
curl "http://localhost:8000/api/wechat/agent/test?content=导出本周的告警报表"

Step 3: 企微管理后台配置

在企微管理后台 → 应用管理 → 自建应用 → 接收消息:

  • 设置 URLhttps://vsp.viewshanghai.com/api/wechat/agent/callback
  • 设置 Token生成随机字符串配置到 .envWECHAT_TOKEN
  • 设置 EncodingAESKey生成随机字符串配置到 .envWECHAT_ENCODING_AES_KEY

Step 4: .env 新增配置项

# 交互Agent
AGENT_ENABLED=true
AGENT_LLM_MODEL=qwen-plus           # 文本对话模型比VLM便宜

# 企微消息回调(在企微管理后台生成)
WECHAT_TOKEN=your_random_token
WECHAT_ENCODING_AES_KEY=your_random_aes_key

Step 5: Final commit

git add -A
git commit -m "feat: Agent系统集成 — 测试接口 + 配置说明"

改动文件清单

文件 改动类型 内容
app/config.py 修改 添加 AgentConfig
app/services/vlm_service.py 修改 优化降级策略
app/services/notify_dispatch.py 修改 修复 VLM 结果存储
app/services/agent_dispatcher.py 新建 交互Agent核心意图识别 + handler路由
app/services/wechat_crypto.py 新建 企微消息加解密
app/services/work_order_service.py 新建 工单CRUD
app/services/report_generator.py 新建 Excel报表生成
app/services/wechat_service.py 修改 添加 send_text_message
app/services/oss_storage.py 修改 添加通用文件上传
app/routers/wechat_callback.py 修改 添加消息回调端点
app/main.py 修改 初始化 Agent dispatcher
requirements.txt 修改 添加 openpyxl, pycryptodome
.env.example 修改 添加 Agent 配置项

架构总览

企微用户发送消息
    ↓
企微服务器 POST → /api/wechat/agent/callback
    ↓
wechat_crypto.decrypt_message() → 解密XML
    ↓
agent_dispatcher.handle_message()
    ↓
LLM 意图识别qwen-plus
    ├─→ create_work_order → work_order_service.create()
    ├─→ query_alarm → alarm_event_service.get_alarms() → 统计文本
    ├─→ export_report → report_generator.generate() → COS上传 → 下载链接
    └─→ general_chat → 兜底回复
    ↓
wechat_service.send_text_message() → 企微API主动推送
边缘端告警上报
    ↓
alarm_event_service.create_from_edge_report()
    ↓
asyncio.create_task(process_alarm_notification())
    ↓
vlm_service.verify_alarm()  ← 推理AgentVLM复核
    ├─→ confirmed=True → 企微卡片通知
    └─→ confirmed=False → 标记误报,不通知