""" 交互Agent调度器 基于 LangGraph StateGraph 的企微交互 Agent。 企微入口适配层:处理图片上传、VLM分析等企微特有逻辑, 核心对话由 LangGraph 图处理。 """ import json import time from typing import Dict, List, Optional from openai import AsyncOpenAI from app.config import settings from app.services.session_manager import get_session_manager from app.utils.logger import logger class AgentDispatcher: """交互Agent调度器(单例)""" def __init__(self): self._vlm_client: Optional[AsyncOpenAI] = None self._enabled = False self._graph = None self._pending_images: Dict[str, List[str]] = {} def init(self, config): """初始化Agent""" self._enabled = config.enabled and bool(config.vlm_api_key) if not self._enabled: logger.info("交互Agent未启用(AGENT_ENABLED=false 或缺少 API Key)") return # VLM 客户端(图片分析专用) self._vlm_client = AsyncOpenAI( api_key=config.vlm_api_key, base_url=config.vlm_base_url, ) from app.services.agent.graph import create_default_graph self._graph = create_default_graph() logger.info(f"交互Agent已启用(LangGraph): model={config.model}") @property def enabled(self) -> bool: return self._enabled # ==================== 消息入口 ==================== async def handle_message(self, user_id: str, content: str) -> str: """处理文字消息""" if not self._enabled: return "AI助手未启用,请联系管理员配置。" self._pending_images[user_id] = [] try: reply = await self._langgraph_chat(user_id, content) except Exception as e: logger.error(f"Agent对话失败: {e}", exc_info=True) reply = "抱歉,AI助手暂时无法响应,请稍后重试。" # 发送待发图片 pending = self._pending_images.pop(user_id, []) if pending: await self._send_images_to_user(user_id, pending) return reply async def handle_image(self, user_id: str, media_id: str) -> str: """处理图片消息""" if not self._enabled: return "AI助手未启用,请联系管理员配置。" session = get_session_manager().get(user_id) # 1. 下载图片 from app.services.wechat_service import get_wechat_service wechat = get_wechat_service() image_data = await wechat.download_media(media_id) if not image_data: return "图片下载失败,请重新发送。" # 2. 上传 COS 持久化 from app.services.oss_storage import get_oss_storage oss = get_oss_storage() object_key = f"agent/{user_id}/{int(time.time())}.jpg" try: oss.upload_file(image_data, object_key, content_type="image/jpeg") except Exception as e: logger.error(f"Agent图片上传COS失败: {e}") return "图片保存失败,请重新发送。" permanent_url = oss.get_permanent_url(object_key) presign_url = oss.get_presigned_url(object_key) # 3. 检查用户是否有待处理工单 handling_alarm_id = self._find_handling_alarm(user_id) if handling_alarm_id: session.pending_images.append(permanent_url) reply = f"收到图片,是否作为【告警 {handling_alarm_id[:20]}...】的处理结果提交?\n回复「是」确认提交,或继续发送更多图片。" session.pending_alarm_id = handling_alarm_id return reply # 4. VLM 分析 from app.services.agent.prompts import IMAGE_ANALYZE_PROMPT analysis = await self._analyze_image(presign_url, IMAGE_ANALYZE_PROMPT) if analysis.get("has_anomaly"): desc = analysis.get("description", "异常情况") reply = f"检测到异常:{desc}\n\n如需上报,请描述具体位置和情况。" else: reply = "未检测到明显安全隐患。如有疑问请描述情况。" return reply # ==================== LangGraph 对话 ==================== async def _langgraph_chat(self, user_id: str, content: str) -> str: """LangGraph 图调用""" config = { "configurable": { "thread_id": f"wechat-{user_id}", "user_id": user_id, } } result = await self._graph.ainvoke( { "messages": [{"role": "user", "content": content}], "user_id": user_id, "pending_images": [], "user_uploaded_images": [], }, config=config, ) # 从工具返回中提取截图 URL(get_alarm_detail 返回的 snapshot_url) self._extract_pending_images(user_id, result) # 获取最终回复 last_msg = result["messages"][-1] reply = last_msg.content if hasattr(last_msg, "content") else str(last_msg) return reply.strip() if reply else "处理完成" def _extract_pending_images(self, user_id: str, result): """从 LangGraph 结果中提取需要发送的截图""" for msg in result.get("messages", []): if hasattr(msg, "type") and msg.type == "tool" and msg.name == "get_alarm_detail": try: data = json.loads(msg.content) if isinstance(msg.content, str) else msg.content url = data.get("snapshot_url", "") if url: if user_id not in self._pending_images: self._pending_images[user_id] = [] self._pending_images[user_id].append(url) except Exception: pass # ==================== 共用方法 ==================== async def _analyze_image(self, image_url: str, prompt: str) -> Dict: """VLM 分析图片内容""" try: resp = await self._vlm_client.chat.completions.create( model=settings.agent.vlm_model, temperature=0.1, messages=[ {"role": "system", "content": prompt}, {"role": "user", "content": [ {"type": "image_url", "image_url": {"url": image_url}}, {"type": "text", "text": "请分析这张图片"}, ]}, ], timeout=settings.agent.vlm_timeout, ) text = resp.choices[0].message.content.strip() if "```" in text: text = text.split("```")[1].strip() if text.startswith("json"): text = text[4:].strip() return json.loads(text) except Exception as e: logger.error(f"VLM图片分析失败: {e}") return {"has_anomaly": False, "description": "", "alarm_type": ""} async def _send_images_to_user(self, user_id: str, image_urls: List[str]): """通过企微发送图片消息给用户""" from app.services.wechat_service import get_wechat_service wechat = get_wechat_service() if not wechat.enabled: return for url in image_urls: try: media_id = await wechat.upload_media_from_url(url) if media_id: access_token = await wechat._get_access_token() import httpx msg = { "touser": user_id, "msgtype": "image", "agentid": wechat.agent_id_int, "image": {"media_id": media_id}, } api_url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}" async with httpx.AsyncClient(timeout=10) as client: await client.post(api_url, json=msg) except Exception as e: logger.error(f"发送告警截图失败: user={user_id}, error={e}") @staticmethod def _find_handling_alarm(user_id: str) -> str: """查找用户正在处理的告警ID""" from app.models import get_session, AlarmEvent db = get_session() try: alarm = db.query(AlarmEvent).filter( AlarmEvent.handler == user_id, AlarmEvent.handle_status == "HANDLING", ).order_by(AlarmEvent.event_time.desc()).first() return alarm.alarm_id if alarm else "" except Exception as e: logger.error(f"查询待处理告警失败: user={user_id}, error={e}") return "" finally: db.close() # 全局单例 _agent_dispatcher: Optional[AgentDispatcher] = None def get_agent_dispatcher() -> AgentDispatcher: global _agent_dispatcher if _agent_dispatcher is None: _agent_dispatcher = AgentDispatcher() return _agent_dispatcher