From 7cc4f604d08eb503d14a96e770bd97e77fac5a1f Mon Sep 17 00:00:00 2001 From: 16337 <1633794139@qq.com> Date: Mon, 9 Mar 2026 10:42:32 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BA=A4=E4=BA=92Agent=20+=20VLM?= =?UTF-8?q?=E4=BC=98=E5=8C=96=20+=20=E4=BC=81=E5=BE=AE=E6=BC=94=E7=A4=BA?= =?UTF-8?q?=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增交互Agent调度器(意图识别 + 工单/查询/报表/闲聊4个Handler) - 新增工单服务、Excel报表生成器、企微消息加解密模块 - VLM提示词优化(角色设定、≤25字描述、布尔值优先输出) - VLM降级策略(入侵默认放行、离岗默认拦截) - 企微演示模式(WECHAT_TEST_UIDS兜底 + SERVICE_BASE_URL修复) - 新增Agent回调路由和测试接口 Co-Authored-By: Claude Opus 4.6 --- .env.example | 9 ++ .gitignore | 3 +- app/config.py | 22 +++ app/main.py | 5 + app/routers/wechat_callback.py | 87 ++++++++++- app/services/agent_dispatcher.py | 237 +++++++++++++++++++++++++++++ app/services/notify_dispatch.py | 16 +- app/services/report_generator.py | 165 ++++++++++++++++++++ app/services/vlm_service.py | 76 ++++----- app/services/wechat_crypto.py | 90 +++++++++++ app/services/wechat_service.py | 25 +++ app/services/work_order_service.py | 144 ++++++++++++++++++ requirements.txt | 2 + 13 files changed, 827 insertions(+), 54 deletions(-) create mode 100644 app/services/agent_dispatcher.py create mode 100644 app/services/report_generator.py create mode 100644 app/services/wechat_crypto.py create mode 100644 app/services/work_order_service.py diff --git a/.env.example b/.env.example index 1488120..782ba42 100644 --- a/.env.example +++ b/.env.example @@ -45,3 +45,12 @@ WECHAT_AGENT_ID=your_agent_id WECHAT_SECRET=your_secret WECHAT_TOKEN=your_callback_token WECHAT_ENCODING_AES_KEY=your_encoding_aes_key + +# 企微演示模式 +WECHAT_TEST_UIDS= # 测试用企微userid,逗号分隔,如 zhangsan,lisi +SERVICE_BASE_URL= # H5页面公网地址,如 https://vsp.viewshanghai.com + +# ===== 交互Agent配置 ===== +AGENT_ENABLED=false +AGENT_LLM_MODEL=qwen-plus # 文本对话模型(复用 DASHSCOPE_API_KEY) +AGENT_LLM_TIMEOUT=15 diff --git a/.gitignore b/.gitignore index 64b3966..1530bab 100644 --- a/.gitignore +++ b/.gitignore @@ -65,4 +65,5 @@ Thumbs.db *分析*.md *报告*.md *修复*.py -*patch*.py +*_patch*.py +hotfix_patch*.py diff --git a/app/config.py b/app/config.py index ac2b14e..9ace39c 100644 --- a/app/config.py +++ b/app/config.py @@ -60,6 +60,18 @@ class WeChatConfig: token: str = "" encoding_aes_key: str = "" enabled: bool = False + test_uids: str = "" # 演示模式:逗号分隔的企微userid,如 "zhangsan,lisi" + service_base_url: str = "" # H5页面公网地址,如 https://vsp.viewshanghai.com + + +@dataclass +class AgentConfig: + """交互Agent配置""" + llm_api_key: str = "" + llm_base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1" + llm_model: str = "qwen-plus" + llm_timeout: int = 15 + enabled: bool = False @dataclass @@ -110,6 +122,7 @@ class Settings(BaseModel): ai_model: AIModelConfig = AIModelConfig() vlm: VLMConfig = VLMConfig() wechat: WeChatConfig = WeChatConfig() + agent: AgentConfig = AgentConfig() redis: RedisConfig = RedisConfig() camera_name: CameraNameConfig = CameraNameConfig() @@ -155,6 +168,15 @@ def load_settings() -> Settings: token=os.getenv("WECHAT_TOKEN", ""), encoding_aes_key=os.getenv("WECHAT_ENCODING_AES_KEY", ""), enabled=os.getenv("WECHAT_ENABLED", "false").lower() == "true", + test_uids=os.getenv("WECHAT_TEST_UIDS", ""), + service_base_url=os.getenv("SERVICE_BASE_URL", ""), + ), + agent=AgentConfig( + llm_api_key=os.getenv("DASHSCOPE_API_KEY", ""), + llm_base_url=os.getenv("AGENT_LLM_BASE_URL", "https://dashscope.aliyuncs.com/compatible-mode/v1"), + llm_model=os.getenv("AGENT_LLM_MODEL", "qwen-plus"), + llm_timeout=int(os.getenv("AGENT_LLM_TIMEOUT", "15")), + enabled=os.getenv("AGENT_ENABLED", "false").lower() == "true", ), redis=RedisConfig( host=os.getenv("REDIS_HOST", "localhost"), diff --git a/app/main.py b/app/main.py index eeeab82..f3fc231 100644 --- a/app/main.py +++ b/app/main.py @@ -60,6 +60,11 @@ async def lifespan(app: FastAPI): wechat_svc = get_wechat_service() wechat_svc.init(settings.wechat) + # 初始化交互Agent + from app.services.agent_dispatcher import get_agent_dispatcher + agent = get_agent_dispatcher() + agent.init(settings.agent) + logger.info("AI 告警平台启动完成") yield diff --git a/app/routers/wechat_callback.py b/app/routers/wechat_callback.py index f62107f..4c65bb1 100644 --- a/app/routers/wechat_callback.py +++ b/app/routers/wechat_callback.py @@ -3,10 +3,13 @@ 处理安保人员在企微卡片上的操作(前往处理/已处理/误报忽略)。 提供告警详情接口供 H5 页面使用。 +接收企微用户消息并路由到交互Agent。 """ +import asyncio from datetime import datetime -from fastapi import APIRouter, Depends, Query +from fastapi import APIRouter, Depends, Query, Request +from fastapi.responses import PlainTextResponse from pydantic import BaseModel from typing import Optional @@ -117,3 +120,85 @@ async def alarm_action_callback( ) return YudaoResponse.success(True) + + +# ==================== 交互Agent消息回调 ==================== + +@router.get("/agent/callback") +async def wechat_agent_verify( + msg_signature: str = Query(...), + timestamp: str = Query(...), + nonce: str = Query(...), + echostr: str = Query(...), +): + """企微回调URL验证(首次配置时调用)""" + from app.services.wechat_crypto import WeChatCrypto + try: + crypto = WeChatCrypto() + echo = crypto.verify_url(msg_signature, timestamp, nonce, echostr) + return PlainTextResponse(content=echo) + except Exception as e: + logger.error(f"企微URL验证失败: {e}") + return PlainTextResponse(content="", status_code=403) + + +@router.post("/agent/callback") +async def wechat_agent_message( + request: Request, + msg_signature: str = Query(...), + timestamp: str = Query(...), + nonce: str = Query(...), +): + """接收企微用户消息,路由到交互Agent""" + body = await request.body() + + from app.services.wechat_crypto import WeChatCrypto + try: + crypto = WeChatCrypto() + msg = crypto.decrypt_message(body, msg_signature, timestamp, nonce) + except Exception as e: + logger.error(f"企微消息解密失败: {e}") + return PlainTextResponse(content="success") + + # 只处理文本消息 + if msg.get("MsgType") != "text": + return PlainTextResponse(content="success") + + user_id = msg.get("FromUserName", "") + content = msg.get("Content", "") + + logger.info(f"收到企微消息: user={user_id}, content={content[:50]}") + + # 异步处理(企微要求5秒内响应),通过主动消息回复 + asyncio.create_task(_process_agent_message(user_id, content)) + return PlainTextResponse(content="success") + + +async def _process_agent_message(user_id: str, content: str): + """异步处理消息并主动回复""" + try: + from app.services.agent_dispatcher import get_agent_dispatcher + from app.services.wechat_service import get_wechat_service + + dispatcher = get_agent_dispatcher() + reply = await dispatcher.handle_message(user_id, content) + + wechat = get_wechat_service() + await wechat.send_text_message(user_id, reply) + except Exception as e: + logger.error(f"Agent消息处理失败: user={user_id}, error={e}", exc_info=True) + + +# ==================== Agent测试接口(开发用) ==================== + +@router.post("/agent/test") +async def test_agent_message( + user_id: str = Query(default="test_user"), + content: str = Query(..., description="测试消息内容"), +): + """测试Agent对话(开发用,无加密,直接返回回复)""" + from app.services.agent_dispatcher import get_agent_dispatcher + dispatcher = get_agent_dispatcher() + reply = await dispatcher.handle_message(user_id, content) + return YudaoResponse.success({"user_id": user_id, "content": content, "reply": reply}) + diff --git a/app/services/agent_dispatcher.py b/app/services/agent_dispatcher.py new file mode 100644 index 0000000..88f392d --- /dev/null +++ b/app/services/agent_dispatcher.py @@ -0,0 +1,237 @@ +""" +交互Agent调度器 + +接收企微用户消息,通过LLM识别意图,路由到对应处理器。 + +支持意图: +- create_work_order: 创建工单("帮我创建XX工单") +- query_alarm: 查询告警("今天有多少告警") +- export_report: 导出报表("导出本周告警报表") +- general_chat: 兜底闲聊 +""" + +import json +import logging +from datetime import datetime, timedelta, timezone +from typing import Dict, Optional + +from openai import AsyncOpenAI + +from app.config import settings + +logger = logging.getLogger(__name__) + +INTENT_SYSTEM_PROMPT = """你是物业安防AI助手。根据用户消息识别意图,仅输出JSON。 + +可选意图: +- create_work_order: 用户要创建工单或上报问题 +- query_alarm: 用户要查询告警数据或统计 +- export_report: 用户要导出报表或Excel +- general_chat: 其他闲聊或无法识别 + +输出格式:{"intent":"...","params":{...}} + +params说明: +- create_work_order: {"title":"工单标题","description":"描述","priority":"low/medium/high/urgent"} +- query_alarm: {"time_range":"today/week/month","alarm_type":"leave_post/intrusion/all"} +- export_report: {"time_range":"today/week/month"} +- general_chat: {"message":"友好的回复内容"}""" + + +class AgentDispatcher: + """交互Agent调度器(单例)""" + + def __init__(self): + self._client: Optional[AsyncOpenAI] = None + self._enabled = False + + def init(self, config): + """初始化Agent""" + self._enabled = config.enabled and bool(config.llm_api_key) + if self._enabled: + self._client = AsyncOpenAI( + api_key=config.llm_api_key, + base_url=config.llm_base_url, + ) + logger.info(f"交互Agent已启用: model={config.llm_model}") + else: + logger.info("交互Agent未启用(AGENT_ENABLED=false 或缺少 API Key)") + + @property + def enabled(self) -> bool: + return self._enabled + + async def handle_message(self, user_id: str, content: str) -> str: + """处理用户消息,返回回复文本""" + if not self._enabled: + return "AI助手未启用,请联系管理员配置。" + + # 1. 意图识别 + intent_result = await self._classify_intent(content) + intent = intent_result.get("intent", "general_chat") + params = intent_result.get("params", {}) + + logger.info(f"Agent意图识别: user={user_id}, intent={intent}, params={params}") + + # 2. 路由到对应 handler + handlers = { + "create_work_order": self._handle_create_work_order, + "query_alarm": self._handle_query_alarm, + "export_report": self._handle_export_report, + "general_chat": self._handle_general_chat, + } + + handler = handlers.get(intent, self._handle_general_chat) + try: + return await handler(user_id, params, content) + except Exception as e: + logger.error(f"Agent handler异常: intent={intent}, error={e}", exc_info=True) + return "处理请求时出错,请稍后重试。" + + async def _classify_intent(self, content: str) -> Dict: + """LLM意图分类""" + try: + resp = await self._client.chat.completions.create( + model=settings.agent.llm_model, + messages=[ + {"role": "system", "content": INTENT_SYSTEM_PROMPT}, + {"role": "user", "content": content}, + ], + timeout=settings.agent.llm_timeout, + ) + text = resp.choices[0].message.content.strip() + if "```" in text: + text = text.split("```")[1] + if text.startswith("json"): + text = text[4:] + text = text.strip() + return json.loads(text) + except Exception as e: + logger.error(f"意图识别失败: {e}") + return {"intent": "general_chat", "params": {"message": "抱歉,我暂时无法理解您的请求。"}} + + async def _handle_create_work_order(self, user_id: str, params: Dict, raw: str) -> str: + """创建工单""" + from app.services.work_order_service import get_work_order_service + svc = get_work_order_service() + + title = params.get("title", "") + if not title: + title = raw[:50] + + order = svc.create_work_order( + title=title, + description=params.get("description", raw), + priority=params.get("priority", "medium"), + assignee_uid=user_id, + ) + + if order: + priority_names = {"low": "低", "medium": "中", "high": "高", "urgent": "紧急"} + p_name = priority_names.get(order.priority.value, "中") + return ( + f"工单已创建\n" + f"编号:{order.order_no}\n" + f"标题:{order.title}\n" + f"优先级:{p_name}\n" + f"状态:待处理" + ) + return "工单创建失败,请稍后重试" + + async def _handle_query_alarm(self, user_id: str, params: Dict, raw: str) -> str: + """查询告警统计""" + from app.services.alarm_event_service import get_alarm_event_service + + svc = get_alarm_event_service() + + # 解析时间范围 + time_range = params.get("time_range", "today") + now = datetime.now(timezone.utc) + if time_range == "week": + start = now - timedelta(days=now.weekday()) + start = start.replace(hour=0, minute=0, second=0, microsecond=0) + range_label = "本周" + elif time_range == "month": + start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + range_label = "本月" + else: + start = now.replace(hour=0, minute=0, second=0, microsecond=0) + range_label = "今日" + + alarm_type_filter = params.get("alarm_type") + if alarm_type_filter == "all": + alarm_type_filter = None + + alarms, total = svc.get_alarms( + alarm_type=alarm_type_filter, + start_time=start, + end_time=now, + page=1, + page_size=10000, + ) + + # 按类型统计 + type_count = {} + status_count = {"NEW": 0, "CONFIRMED": 0, "FALSE": 0, "CLOSED": 0} + for a in alarms: + type_count[a.alarm_type] = type_count.get(a.alarm_type, 0) + 1 + if a.alarm_status in status_count: + status_count[a.alarm_status] += 1 + + type_names = {"leave_post": "人员离岗", "intrusion": "周界入侵"} + type_lines = [f" {type_names.get(t, t)}: {c}条" for t, c in type_count.items()] + + return ( + f"{range_label}告警统计\n" + f"总计: {total}条\n" + + "\n".join(type_lines) + "\n" + f"待处理: {status_count['NEW']}条\n" + f"已处理: {status_count['CLOSED']}条\n" + f"误报过滤: {status_count['FALSE']}条" + ) + + async def _handle_export_report(self, user_id: str, params: Dict, raw: str) -> str: + """导出Excel报表""" + from app.services.report_generator import generate_alarm_report + from app.services.oss_storage import get_oss_storage + + time_range = params.get("time_range", "week") + result = generate_alarm_report(time_range=time_range) + + if not result: + range_names = {"today": "今日", "week": "本周", "month": "本月"} + return f"{range_names.get(time_range, '今日')}暂无告警数据,无法生成报表。" + + filename, file_bytes = result + + # 上传到 COS 获取下载链接 + oss = get_oss_storage() + try: + object_key = oss.upload_file( + file_bytes.read(), + f"reports/{filename}", + content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + ) + download_url = oss.get_presigned_url(object_key, expire=3600) + return f"报表已生成\n文件:{filename}\n下载:{download_url}" + except Exception as e: + logger.warning(f"报表上传COS失败: {e}") + return f"报表已生成({filename}),但上传失败,请联系管理员。" + + async def _handle_general_chat(self, user_id: str, params: Dict, raw: str) -> str: + """兜底回复""" + msg = params.get("message", "") + if msg: + return msg + return "您好,我是安防AI助手。可以帮您:\n1. 创建工单\n2. 查询告警统计\n3. 导出告警报表\n\n请直接描述您的需求。" + + +# 全局单例 +_agent_dispatcher: Optional[AgentDispatcher] = None + + +def get_agent_dispatcher() -> AgentDispatcher: + global _agent_dispatcher + if _agent_dispatcher is None: + _agent_dispatcher = AgentDispatcher() + return _agent_dispatcher diff --git a/app/services/notify_dispatch.py b/app/services/notify_dispatch.py index 71d09af..912c05f 100644 --- a/app/services/notify_dispatch.py +++ b/app/services/notify_dispatch.py @@ -69,6 +69,14 @@ async def process_alarm_notification(alarm_data: Dict): description = vlm_result.get("description", "") area_name, persons = _get_notify_persons(device_id, alarm_level) + # 演示模式:数据库无人员时,使用配置的测试 userid + if not persons and settings.wechat.test_uids: + test_uids = [uid.strip() for uid in settings.wechat.test_uids.split(",") if uid.strip()] + if test_uids: + persons = [{"person_name": "测试用户", "wechat_uid": uid, "role": "TEST"} for uid in test_uids] + area_name = "演示区域" + logger.info(f"演示模式: 使用测试用户 {test_uids}") + if not persons: logger.warning(f"未找到通知人员: camera={device_id}, 跳过企微推送") return @@ -95,7 +103,7 @@ async def process_alarm_notification(alarm_data: Dict): snapshot_url=snapshot_url, event_time=event_time_str, alarm_level=alarm_level, - service_base_url=f"http://{settings.app.host}:{settings.app.port}", + service_base_url=settings.wechat.service_base_url or f"http://{settings.app.host}:{settings.app.port}", ) logger.info(f"告警通知完成: {alarm_id} → {len(persons)} 人") @@ -110,12 +118,12 @@ def _save_vlm_result(alarm_id: str, vlm_result: Dict): try: analysis = AlarmLlmAnalysis( alarm_id=alarm_id, - llm_model=vlm_result.get("model", "qwen3-vl-flash"), + llm_model="qwen3-vl-flash", analysis_type="REVIEW", summary=vlm_result.get("description", ""), is_false_alarm=not vlm_result.get("confirmed", True), - confidence_score=0.0 if vlm_result.get("skipped") else 0.9, - suggestion=None, + confidence_score=None if vlm_result.get("skipped") else 0.9, + suggestion="VLM跳过" if vlm_result.get("skipped") else None, ) db.add(analysis) db.commit() diff --git a/app/services/report_generator.py b/app/services/report_generator.py new file mode 100644 index 0000000..1231562 --- /dev/null +++ b/app/services/report_generator.py @@ -0,0 +1,165 @@ +""" +告警报表生成器 + +生成 Excel 格式的告警汇总报表,包含告警明细和统计汇总两个 Sheet。 +""" + +import io +from datetime import datetime, timedelta, timezone +from typing import Optional, Tuple + +from openpyxl import Workbook +from openpyxl.styles import Font, PatternFill, Alignment, Border, Side + +from app.models import AlarmEvent, get_session +from app.utils.logger import logger + + +TYPE_NAMES = {"leave_post": "人员离岗", "intrusion": "周界入侵"} +LEVEL_NAMES = {1: "提醒", 2: "一般", 3: "严重", 4: "紧急"} +STATUS_NAMES = { + "NEW": "待处理", "CONFIRMED": "已确认", + "FALSE": "误报", "CLOSED": "已关闭", +} + + +def generate_alarm_report( + time_range: str = "week", +) -> Optional[Tuple[str, io.BytesIO]]: + """ + 生成告警汇总Excel + + Args: + time_range: today/week/month + + Returns: + (filename, bytes_io) 或 None(无数据时) + """ + now = datetime.now(timezone.utc) + if time_range == "today": + start = now.replace(hour=0, minute=0, second=0, microsecond=0) + label = now.strftime("%Y%m%d") + elif time_range == "week": + start = now - timedelta(days=now.weekday()) + start = start.replace(hour=0, minute=0, second=0, microsecond=0) + label = f"{start.strftime('%Y%m%d')}-{now.strftime('%Y%m%d')}" + elif time_range == "month": + start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + label = now.strftime("%Y%m") + else: + start = now.replace(hour=0, minute=0, second=0, microsecond=0) + label = now.strftime("%Y%m%d") + + db = get_session() + try: + alarms = ( + db.query(AlarmEvent) + .filter(AlarmEvent.event_time >= start, AlarmEvent.event_time <= now) + .order_by(AlarmEvent.event_time.desc()) + .all() + ) + + if not alarms: + return None + + wb = Workbook() + + # ===== Sheet 1: 告警明细 ===== + ws = wb.active + ws.title = "告警明细" + + headers = [ + "告警ID", "告警类型", "设备ID", "场景ID", "告警级别", + "告警状态", "处理状态", "置信度", "事件时间", "处理人", "备注", + ] + + header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid") + header_font = Font(color="FFFFFF", bold=True, size=11) + thin_border = Border( + left=Side(style="thin"), right=Side(style="thin"), + top=Side(style="thin"), bottom=Side(style="thin"), + ) + + for col, h in enumerate(headers, 1): + cell = ws.cell(row=1, column=col, value=h) + cell.fill = header_fill + cell.font = header_font + cell.alignment = Alignment(horizontal="center") + cell.border = thin_border + + for row, a in enumerate(alarms, 2): + values = [ + a.alarm_id, + TYPE_NAMES.get(a.alarm_type, a.alarm_type), + a.device_id, + a.scene_id or "", + LEVEL_NAMES.get(a.alarm_level, str(a.alarm_level or "")), + STATUS_NAMES.get(a.alarm_status, a.alarm_status or ""), + a.handle_status or "", + f"{a.confidence_score:.2f}" if a.confidence_score else "", + a.event_time.strftime("%Y-%m-%d %H:%M:%S") if a.event_time else "", + a.handler or "", + a.handle_remark or "", + ] + for col, v in enumerate(values, 1): + cell = ws.cell(row=row, column=col, value=v) + cell.border = thin_border + + # 自动列宽 + for col_cells in ws.columns: + max_len = max(len(str(cell.value or "")) for cell in col_cells) + ws.column_dimensions[col_cells[0].column_letter].width = min(max_len + 4, 30) + + # ===== Sheet 2: 统计汇总 ===== + ws2 = wb.create_sheet("统计汇总") + + type_count = {} + level_count = {} + status_count = {} + for a in alarms: + type_count[a.alarm_type] = type_count.get(a.alarm_type, 0) + 1 + level_count[a.alarm_level] = level_count.get(a.alarm_level, 0) + 1 + status_count[a.alarm_status] = status_count.get(a.alarm_status, 0) + 1 + + title_font = Font(bold=True, size=12) + + # 类型统计 + ws2.cell(row=1, column=1, value="告警类型统计").font = title_font + ws2.cell(row=2, column=1, value="类型") + ws2.cell(row=2, column=2, value="数量") + for i, (t, c) in enumerate(type_count.items(), 3): + ws2.cell(row=i, column=1, value=TYPE_NAMES.get(t, t)) + ws2.cell(row=i, column=2, value=c) + + # 状态统计 + offset = len(type_count) + 4 + ws2.cell(row=offset, column=1, value="告警状态统计").font = title_font + ws2.cell(row=offset + 1, column=1, value="状态") + ws2.cell(row=offset + 1, column=2, value="数量") + for i, (s, c) in enumerate(status_count.items(), offset + 2): + ws2.cell(row=i, column=1, value=STATUS_NAMES.get(s, s)) + ws2.cell(row=i, column=2, value=c) + + # 级别统计 + offset2 = offset + len(status_count) + 3 + ws2.cell(row=offset2, column=1, value="告警级别统计").font = title_font + ws2.cell(row=offset2 + 1, column=1, value="级别") + ws2.cell(row=offset2 + 1, column=2, value="数量") + for i, (lv, c) in enumerate(level_count.items(), offset2 + 2): + ws2.cell(row=i, column=1, value=LEVEL_NAMES.get(lv, str(lv))) + ws2.cell(row=i, column=2, value=c) + + # 输出到内存 + output = io.BytesIO() + wb.save(output) + output.seek(0) + + filename = f"告警报表_{label}.xlsx" + logger.info(f"报表已生成: {filename}, 告警数={len(alarms)}") + return (filename, output) + + except Exception as e: + logger.error(f"生成报表失败: {e}", exc_info=True) + return None + finally: + db.close() diff --git a/app/services/vlm_service.py b/app/services/vlm_service.py index d66c9ae..37dd286 100644 --- a/app/services/vlm_service.py +++ b/app/services/vlm_service.py @@ -16,32 +16,22 @@ logger = logging.getLogger(__name__) # 算法类型 → VLM Prompt 模板 VLM_PROMPTS = { - "leave_post": """分析这张岗位监控截图。 -摄像头位置:{camera_name},监控区域:{roi_name}。 -边缘AI检测到该区域无人在岗,请你复核:该区域内是否确实没有工作人员在岗? + "leave_post": """你是安防监控AI复核员。判断{roi_name}岗位区域内是否有人在岗。 +confirmed=true表示确实无人在岗(告警成立),false表示有人(误报)。 +description用≤25字描述画面。 +仅输出JSON:{{"confirmed":true,"description":"..."}}""", -输出严格的JSON格式(不要输出其他内容): -{{"confirmed": true, "description": "一句话描述当前画面"}} - -说明:confirmed=true 表示确实无人在岗(告警成立),confirmed=false 表示有人在岗(误报)。""", - - "intrusion": """分析这张周界监控截图。 -摄像头位置:{camera_name},监控区域:{roi_name}。 -边缘AI检测到该区域有人员入侵,请你复核:该区域内是否确实有人员出现? - -输出严格的JSON格式(不要输出其他内容): -{{"confirmed": true, "description": "一句话描述当前画面"}} - -说明:confirmed=true 表示确实有人入侵(告警成立),confirmed=false 表示无人(误报)。""", + "intrusion": """你是安防监控AI复核员。判断{roi_name}周界区域内是否有人员入侵。 +confirmed=true表示确实有人入侵(告警成立),false表示无人(误报)。 +description用≤25字描述画面。 +仅输出JSON:{{"confirmed":true,"description":"..."}}""", } # 通用降级 prompt(未知算法类型时使用) -DEFAULT_PROMPT = """分析这张监控截图。 -摄像头位置:{camera_name},监控区域:{roi_name}。 -边缘AI触发了 {alarm_type} 告警,请判断告警是否属实。 - -输出严格的JSON格式(不要输出其他内容): -{{"confirmed": true, "description": "一句话描述当前画面"}}""" +DEFAULT_PROMPT = """你是安防监控AI复核员。边缘AI触发了{alarm_type}告警,判断告警是否属实。 +confirmed=true表示告警成立,false表示误报。 +description用≤25字描述画面。 +仅输出JSON:{{"confirmed":true,"description":"..."}}""" class VLMService: @@ -74,6 +64,16 @@ class VLMService: def enabled(self) -> bool: return self._enabled + @staticmethod + def _fallback_result(alarm_type: str, camera_name: str, reason: str) -> Dict: + """降级结果:入侵默认放行(宁可多报),离岗默认拦截(避免VLM不可用时误推)""" + confirmed = alarm_type != "leave_post" + return { + "confirmed": confirmed, + "description": f"{camera_name or '未知位置'} 触发 {alarm_type} 告警({reason})", + "skipped": True, + } + async def verify_alarm( self, snapshot_url: str, @@ -95,19 +95,11 @@ class VLMService: - skipped=True 表示 VLM 未调用(降级处理) """ if not self._enabled or not self._client: - return { - "confirmed": True, - "description": f"{camera_name or '未知位置'} 触发 {alarm_type} 告警", - "skipped": True, - } + return self._fallback_result(alarm_type, camera_name, "VLM未启用") if not snapshot_url: logger.warning("告警无截图URL,跳过 VLM 复核") - return { - "confirmed": True, - "description": f"{camera_name or '未知位置'} 触发 {alarm_type} 告警(无截图)", - "skipped": True, - } + return self._fallback_result(alarm_type, camera_name, "无截图") # 选择 prompt 模板 template = VLM_PROMPTS.get(alarm_type, DEFAULT_PROMPT) @@ -144,7 +136,7 @@ class VLMService: result = json.loads(content) logger.info( f"VLM 复核完成: confirmed={result.get('confirmed')}, " - f"desc={result.get('description', '')[:50]}" + f"desc={result.get('description', '')[:30]}" ) return { "confirmed": result.get("confirmed", True), @@ -154,25 +146,13 @@ class VLMService: except asyncio.TimeoutError: logger.warning(f"VLM 复核超时 ({self._timeout}s),降级处理") - return { - "confirmed": True, - "description": f"{camera_name or '未知位置'} 触发 {alarm_type} 告警(VLM超时)", - "skipped": True, - } + return self._fallback_result(alarm_type, camera_name, "VLM超时") except json.JSONDecodeError as e: logger.warning(f"VLM 返回内容解析失败: {e}, 原始内容: {content[:200]}") - return { - "confirmed": True, - "description": content[:100] if content else "VLM返回异常", - "skipped": True, - } + return self._fallback_result(alarm_type, camera_name, "解析失败") except Exception as e: logger.error(f"VLM 调用异常: {e}") - return { - "confirmed": True, - "description": f"{camera_name or '未知位置'} 触发 {alarm_type} 告警(VLM异常)", - "skipped": True, - } + return self._fallback_result(alarm_type, camera_name, "VLM异常") # 全局单例 diff --git a/app/services/wechat_crypto.py b/app/services/wechat_crypto.py new file mode 100644 index 0000000..a66856c --- /dev/null +++ b/app/services/wechat_crypto.py @@ -0,0 +1,90 @@ +""" +企微消息加解密 + +基于企微官方加解密方案实现 AES-CBC-256 消息加解密和签名验证。 +参考:https://developer.work.weixin.qq.com/document/path/90930 + +注意:需要安装 pycryptodome: pip install pycryptodome +如果 pycryptodome 未安装,模块会优雅降级,verify_url/decrypt_message 抛出明确异常。 +""" + +import base64 +import hashlib +import struct +import xml.etree.ElementTree as ET +import logging +from typing import Dict + +from app.config import settings + +logger = logging.getLogger(__name__) + +try: + from Crypto.Cipher import AES + _HAS_CRYPTO = True +except ImportError: + _HAS_CRYPTO = False + logger.warning("pycryptodome 未安装,企微消息加解密不可用。安装命令: pip install pycryptodome") + + +class WeChatCrypto: + """企微消息加解密""" + + def __init__(self): + self._token = settings.wechat.token + key = settings.wechat.encoding_aes_key + if key and len(key) == 43: + self._aes_key = base64.b64decode(key + "=") + else: + self._aes_key = b"" + + def verify_url(self, msg_signature: str, timestamp: str, nonce: str, echostr: str) -> str: + """验证回调URL,返回解密后的echostr""" + if not _HAS_CRYPTO: + raise RuntimeError("pycryptodome 未安装,无法验证URL") + if not self._aes_key: + raise ValueError("EncodingAESKey 未配置") + + self._check_signature(msg_signature, timestamp, nonce, echostr) + return self._decrypt(echostr) + + def decrypt_message(self, xml_body: bytes, msg_signature: str, timestamp: str, nonce: str) -> Dict[str, str]: + """解密企微消息XML,返回消息字典""" + if not _HAS_CRYPTO: + raise RuntimeError("pycryptodome 未安装,无法解密消息") + if not self._aes_key: + raise ValueError("EncodingAESKey 未配置") + + root = ET.fromstring(xml_body) + encrypt_node = root.find("Encrypt") + if encrypt_node is None or not encrypt_node.text: + raise ValueError("XML中缺少Encrypt节点") + + encrypt = encrypt_node.text + self._check_signature(msg_signature, timestamp, nonce, encrypt) + decrypted_xml = self._decrypt(encrypt) + + msg_root = ET.fromstring(decrypted_xml) + return {child.tag: (child.text or "") for child in msg_root} + + def _check_signature(self, msg_signature: str, timestamp: str, nonce: str, encrypt: str): + """校验签名""" + items = sorted([self._token, timestamp, nonce, encrypt]) + sha1 = hashlib.sha1("".join(items).encode("utf-8")).hexdigest() + if sha1 != msg_signature: + raise ValueError(f"签名验证失败: expected={sha1}, got={msg_signature}") + + def _decrypt(self, text: str) -> str: + """AES-CBC 解密""" + cipher = AES.new(self._aes_key, AES.MODE_CBC, iv=self._aes_key[:16]) + decrypted = cipher.decrypt(base64.b64decode(text)) + # 去除PKCS7填充 + pad = decrypted[-1] + if isinstance(pad, int): + content = decrypted[:-pad] + else: + content = decrypted[:-ord(pad)] + # 去除16字节随机串 + 4字节消息长度 + msg_len = struct.unpack("!I", content[16:20])[0] + msg = content[20:20 + msg_len].decode("utf-8") + return msg diff --git a/app/services/wechat_service.py b/app/services/wechat_service.py index 6a612a3..f408fd2 100644 --- a/app/services/wechat_service.py +++ b/app/services/wechat_service.py @@ -152,6 +152,31 @@ class WeChatService: logger.error(f"企微发送异常: {e}") return False + async def send_text_message(self, user_id: str, content: str) -> bool: + """发送文本消息给指定用户""" + if not self._enabled: + return False + try: + access_token = await self._get_access_token() + msg = { + "touser": user_id, + "msgtype": "text", + "agentid": int(self._agent_id) if self._agent_id else 0, + "text": {"content": content}, + } + url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}" + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.post(url, json=msg) + data = resp.json() + if data.get("errcode") != 0: + logger.error(f"企微文本消息发送失败: {data}") + return False + logger.info(f"企微文本消息已发送: user={user_id}") + return True + except Exception as e: + logger.error(f"发送文本消息异常: {e}") + return False + # 全局单例 _wechat_service: Optional[WeChatService] = None diff --git a/app/services/work_order_service.py b/app/services/work_order_service.py new file mode 100644 index 0000000..7f9f481 --- /dev/null +++ b/app/services/work_order_service.py @@ -0,0 +1,144 @@ +""" +工单服务 + +提供工单的创建、查询、更新功能。 +由交互Agent的工单Handler调用。 +""" + +import uuid +from datetime import datetime, timezone +from typing import Optional, Dict, List, Tuple + +from app.models import WorkOrder, WorkOrderStatus, WorkOrderPriority, get_session +from app.utils.logger import logger + + +def generate_order_no() -> str: + """生成工单编号: WO + YYYYMMDDHHmmss + 6位uuid""" + ts = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S") + return f"WO{ts}{uuid.uuid4().hex[:6].upper()}" + + +class WorkOrderService: + """工单服务""" + + def create_work_order( + self, + title: str, + description: str = "", + priority: str = "medium", + assignee_uid: str = "", + assignee_name: str = "", + alarm_id: str = "", + ) -> Optional[WorkOrder]: + """创建工单""" + db = get_session() + try: + valid_priorities = [e.value for e in WorkOrderPriority] + order = WorkOrder( + order_no=generate_order_no(), + title=title, + description=description, + priority=WorkOrderPriority(priority) if priority in valid_priorities else WorkOrderPriority.MEDIUM, + assignee_id=assignee_uid, + assignee_name=assignee_name, + status=WorkOrderStatus.CREATED, + ) + if alarm_id: + order.alert_no = alarm_id + + db.add(order) + db.commit() + db.refresh(order) + logger.info(f"工单已创建: {order.order_no}, title={title}") + return order + except Exception as e: + db.rollback() + logger.error(f"创建工单失败: {e}") + return None + finally: + db.close() + + def get_work_order(self, order_no: str) -> Optional[WorkOrder]: + """查询工单""" + db = get_session() + try: + return db.query(WorkOrder).filter(WorkOrder.order_no == order_no).first() + finally: + db.close() + + def get_work_orders( + self, + status: Optional[str] = None, + assignee_id: Optional[str] = None, + page: int = 1, + page_size: int = 20, + ) -> Tuple[List[WorkOrder], int]: + """分页查询工单""" + db = get_session() + try: + query = db.query(WorkOrder) + if status: + query = query.filter(WorkOrder.status == status) + if assignee_id: + query = query.filter(WorkOrder.assignee_id == assignee_id) + + total = query.count() + orders = ( + query.order_by(WorkOrder.created_at.desc()) + .offset((page - 1) * page_size) + .limit(page_size) + .all() + ) + return orders, total + finally: + db.close() + + def update_status( + self, + order_no: str, + status: str, + result: str = "", + ) -> Optional[WorkOrder]: + """更新工单状态""" + db = get_session() + try: + order = db.query(WorkOrder).filter(WorkOrder.order_no == order_no).first() + if not order: + return None + + valid_statuses = [e.value for e in WorkOrderStatus] + if status in valid_statuses: + order.status = WorkOrderStatus(status) + + if result: + order.result = result + + now = datetime.now(timezone.utc) + if status == "processing" and not order.started_at: + order.started_at = now + elif status == "completed": + order.completed_at = now + + order.updated_at = now + db.commit() + db.refresh(order) + logger.info(f"工单状态更新: {order_no} -> {status}") + return order + except Exception as e: + db.rollback() + logger.error(f"更新工单失败: {e}") + return None + finally: + db.close() + + +# 全局单例 +_work_order_service: Optional[WorkOrderService] = None + + +def get_work_order_service() -> WorkOrderService: + global _work_order_service + if _work_order_service is None: + _work_order_service = WorkOrderService() + return _work_order_service diff --git a/requirements.txt b/requirements.txt index b835d4a..3663e65 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,3 +15,5 @@ redis>=5.0.0 pymysql>=1.1.0 cos-python-sdk-v5>=1.9.30 openai==1.68.0 +openpyxl>=3.1.0 +pycryptodome>=3.19.0