diff --git a/app/services/agent/__init__.py b/app/services/agent/__init__.py new file mode 100644 index 0000000..4755d57 --- /dev/null +++ b/app/services/agent/__init__.py @@ -0,0 +1,5 @@ +""" +LangGraph Agent 模块 + +基于 LangGraph StateGraph 的企微交互 Agent,替代手写 FC 循环。 +""" diff --git a/app/services/agent/graph.py b/app/services/agent/graph.py new file mode 100644 index 0000000..d5d3934 --- /dev/null +++ b/app/services/agent/graph.py @@ -0,0 +1,70 @@ +""" +LangGraph StateGraph 构建 + +构建 assistant + ToolNode 的 ReAct 图,接入 Dashscope Qwen。 +""" + +from typing import Optional + +from langgraph.graph import StateGraph, START, END +from langgraph.prebuilt import ToolNode, tools_condition +from langgraph.checkpoint.memory import MemorySaver +from langchain_openai import ChatOpenAI + +from app.config import settings +from app.utils.logger import logger +from .state import AgentState +from .tools import all_tools +from .prompts import SYSTEM_PROMPT + + +def _create_llm(): + """创建 LLM 客户端(通过 Dashscope OpenAI 兼容接口对接 Qwen)""" + return ChatOpenAI( + model=settings.agent.model, + base_url=settings.agent.vlm_base_url, + api_key=settings.agent.vlm_api_key, + timeout=settings.agent.timeout, + ) + + +def build_agent_graph(checkpointer=None): + """构建并编译 Agent 图 + + Args: + checkpointer: LangGraph checkpointer 实例(None=不持久化) + + Returns: + 编译后的 CompiledGraph + """ + llm = _create_llm() + llm_with_tools = llm.bind_tools(all_tools) + + def assistant(state: AgentState): + """LLM 推理节点:接收消息 + 系统提示,返回回复或工具调用""" + system_msg = {"role": "system", "content": SYSTEM_PROMPT} + response = llm_with_tools.invoke([system_msg] + state["messages"]) + return {"messages": [response]} + + # 构建图 + builder = StateGraph(AgentState) + + # 两个核心节点 + builder.add_node("assistant", assistant) + builder.add_node("tools", ToolNode(all_tools)) + + # 边:START → assistant → (条件) → tools 或 END + builder.add_edge(START, "assistant") + builder.add_conditional_edges("assistant", tools_condition) + builder.add_edge("tools", "assistant") + + # 编译 + graph = builder.compile(checkpointer=checkpointer) + logger.info("LangGraph Agent 图已编译") + return graph + + +def create_default_graph(): + """创建带内存 checkpoint 的默认图(开发用)""" + checkpointer = MemorySaver() + return build_agent_graph(checkpointer=checkpointer) diff --git a/app/services/agent/prompts.py b/app/services/agent/prompts.py new file mode 100644 index 0000000..7cbdcf1 --- /dev/null +++ b/app/services/agent/prompts.py @@ -0,0 +1,34 @@ +""" +Agent Prompt 定义 +""" + +SYSTEM_PROMPT = """你是VSP安防AI助手,通过企业微信协助安保人员处理告警和工单。 + +能力: +1. 查询告警统计和明细(query_alarm_stats / list_alarms / get_alarm_detail) +2. 处理告警(update_alarm_status:确认接单、忽略、处理完成、误报) +3. 提交工单处理结果(submit_order_result:含文字描述和处理后照片) +4. 查询待处理工单(list_my_orders) +5. 查询摄像头信息(query_camera) + +交互规则: +- 用户发图片时,如果有待处理工单,询问是否作为处理结果上传 +- 用户说"处理完了"并附带描述,自动提交结单 +- 回复简洁,适合手机阅读 +- 重要信息用【】标注 +- 禁止使用markdown语法(如![](url)、**加粗**、# 标题),企微聊天不支持 +- 告警截图会自动发送图片消息,文字回复中不要包含图片链接""" + +IMAGE_ANALYZE_PROMPT = """你是物业安防图片分析员。分析这张图片,判断是否存在安全隐患或需要上报的情况。 + +需要关注的异常包括: +- 岗位无人值守(前台、监控室、门岗等应有人但没人) +- 人员入侵(非授权区域出现人员) +- 车辆违停(禁停区域有车辆) +- 消防隐患(灭火器缺失、通道堵塞、线路杂乱) +- 设施损坏(门窗破损、设备故障) +- 物品遗留(可疑包裹、危险物品) + +请用JSON格式回复: +{"has_anomaly": true/false, "description": "异常描述", "alarm_type": "告警类型(leave_post/intrusion/illegal_parking/fire/damage/other/none)"} +只输出JSON,不要其他内容。""" diff --git a/app/services/agent/state.py b/app/services/agent/state.py new file mode 100644 index 0000000..a1e5c97 --- /dev/null +++ b/app/services/agent/state.py @@ -0,0 +1,21 @@ +""" +Agent 状态定义 +""" + +from typing import Annotated, List +from typing_extensions import TypedDict +from langgraph.graph.message import AnyMessage, add_messages + + +class AgentState(TypedDict): + """LangGraph Agent 状态 + + messages: 对话消息列表(append-only,LangGraph 自动合并) + user_id: 企微 userid(图入口注入,工具通过 config 读取) + pending_images: 工具执行期间收集的待发图片 URL(告警截图等) + user_uploaded_images: 用户上传的图片 COS 永久 URL(用于工单结单) + """ + messages: Annotated[list[AnyMessage], add_messages] + user_id: str + pending_images: List[str] + user_uploaded_images: List[str] diff --git a/app/services/agent/tools/__init__.py b/app/services/agent/tools/__init__.py new file mode 100644 index 0000000..292740a --- /dev/null +++ b/app/services/agent/tools/__init__.py @@ -0,0 +1,19 @@ +""" +工具注册表:导出 all_tools 供图构建使用 +""" + +from .alarm_query import query_alarm_stats, list_alarms, get_alarm_detail +from .alarm_action import update_alarm_status +from .order_tools import list_my_orders, submit_order_result +from .camera_tools import query_camera + +# 所有工具列表 — 添加新工具只需在这里追加 +all_tools = [ + query_alarm_stats, + list_alarms, + get_alarm_detail, + update_alarm_status, + list_my_orders, + submit_order_result, + query_camera, +] diff --git a/app/services/agent/tools/alarm_action.py b/app/services/agent/tools/alarm_action.py new file mode 100644 index 0000000..6e2148d --- /dev/null +++ b/app/services/agent/tools/alarm_action.py @@ -0,0 +1,125 @@ +""" +告警操作工具:确认接单、忽略、完成、误报 +""" + +import json +from langchain_core.tools import tool +from langchain_core.runnables import RunnableConfig + +from app.utils.logger import logger + + +def _get_order_id_for_alarm(alarm_id: str) -> str: + """从 alarm_event_ext 中获取关联的工单ID""" + from app.models import get_session, AlarmEventExt + db = get_session() + try: + ext = db.query(AlarmEventExt).filter( + AlarmEventExt.alarm_id == alarm_id, + AlarmEventExt.ext_type == "WORK_ORDER", + ).first() + if ext and ext.ext_data: + return ext.ext_data.get("order_id", "") + return "" + except Exception as e: + logger.error(f"查询工单ID失败: alarm={alarm_id}, error={e}") + return "" + finally: + db.close() + + +async def _update_wechat_card(alarm_id: str, user_id: str, action: str): + """更新企微卡片状态""" + try: + from app.services.wechat_service import get_wechat_service + wechat = get_wechat_service() + response_code = wechat.get_response_code(alarm_id) + if not response_code: + return + + if action == "confirm": + await wechat.update_alarm_card_step2( + response_code=response_code, user_ids=[user_id], + alarm_id=alarm_id, operator_name=user_id, + ) + else: + await wechat.update_alarm_card_terminal( + response_code=response_code, user_ids=[user_id], + alarm_id=alarm_id, action=action, operator_name=user_id, + ) + except Exception as e: + logger.error(f"更新企微卡片失败: alarm={alarm_id}, error={e}") + + +@tool +async def update_alarm_status(alarm_id: str, action: str, config: RunnableConfig) -> str: + """更新告警状态:确认接单(confirm)、忽略(ignore)、处理完成(complete)、标记误报(false) + + Args: + alarm_id: 告警ID + action: 操作类型 confirm=确认接单 ignore=忽略 complete=处理完成 false=标记误报 + """ + from app.services.alarm_event_service import get_alarm_event_service + + user_id = config.get("configurable", {}).get("user_id", "") + svc = get_alarm_event_service() + + # 查告警是否存在 + detail = svc.get_alarm(alarm_id) + if not detail: + return json.dumps({"error": f"未找到告警: {alarm_id}"}, ensure_ascii=False) + + # 获取关联工单ID + order_id = _get_order_id_for_alarm(alarm_id) + + from app.services.work_order_client import get_work_order_client + wo_client = get_work_order_client() + + # 操作映射 + action_map = { + "confirm": ("CONFIRMED", "HANDLING", "企微Agent确认接单"), + "ignore": ("FALSE", "IGNORED", "企微Agent忽略"), + "complete": ("CLOSED", "DONE", "企微Agent已处理"), + "false": ("FALSE", "IGNORED", "企微Agent标记误报"), + } + + if action not in action_map: + return json.dumps({"error": f"未知操作: {action}"}, ensure_ascii=False) + + alarm_status, handle_status, remark = action_map[action] + + # IoT 工单操作 + if order_id and wo_client.enabled: + iot_success = False + if action == "confirm": + iot_success = await wo_client.confirm_order(order_id) + elif action in ("ignore", "false"): + iot_success = await wo_client.false_alarm(order_id) + elif action == "complete": + iot_success = await wo_client.submit_order(order_id, result=f"已处理 by {user_id}") + if not iot_success: + remark += "(IoT降级)" + + # 本地状态更新(不管 IoT 成功与否) + svc.handle_alarm( + alarm_id=alarm_id, + alarm_status=alarm_status, + handle_status=handle_status, + handler=user_id, + remark=remark, + ) + + # 更新企微卡片 + card_action = action if action != "confirm" else "confirm" + await _update_wechat_card(alarm_id, user_id, card_action) + + action_labels = { + "confirm": "已确认接单", + "ignore": "已忽略", + "complete": "已处理完成", + "false": "已标记误报", + } + return json.dumps( + {"success": True, "message": f"{action_labels[action]}: {alarm_id}"}, + ensure_ascii=False, + ) diff --git a/app/services/agent/tools/alarm_query.py b/app/services/agent/tools/alarm_query.py new file mode 100644 index 0000000..7ec7330 --- /dev/null +++ b/app/services/agent/tools/alarm_query.py @@ -0,0 +1,196 @@ +""" +告警查询工具:统计、列表、详情 +""" + +import json +from datetime import timedelta +from typing import Optional +from langchain_core.tools import tool +from langchain_core.runnables import RunnableConfig + +from app.utils.logger import logger +from app.utils.timezone import beijing_now + + +# 告警类型/级别/状态 中文映射 +ALARM_TYPE_NAMES = { + "leave_post": "人员离岗", "intrusion": "周界入侵", + "illegal_parking": "车辆违停", "vehicle_congestion": "车辆拥堵", +} +ALARM_LEVEL_NAMES = {0: "紧急", 1: "重要", 2: "普通", 3: "轻微"} +ALARM_STATUS_NAMES = { + "NEW": "待处理", "CONFIRMED": "处理中", + "FALSE": "误报", "CLOSED": "已关闭", +} + + +def _parse_time_range(time_range: str): + """解析时间范围,返回 (start_time, label)""" + now = beijing_now() + if time_range == "week": + start = now - timedelta(days=now.weekday()) + start = start.replace(hour=0, minute=0, second=0, microsecond=0) + return start, "本周" + elif time_range == "month": + start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + return start, "本月" + else: + start = now.replace(hour=0, minute=0, second=0, microsecond=0) + return start, "今日" + + +def _get_camera_display_name(device_id: str) -> str: + """同步获取摄像头显示名称""" + try: + import asyncio + from app.services.camera_name_service import get_camera_name_service + camera_service = get_camera_name_service() + loop = asyncio.get_event_loop() + if loop.is_running(): + import concurrent.futures + with concurrent.futures.ThreadPoolExecutor() as pool: + cam_info = pool.submit( + asyncio.run, camera_service.get_camera_info(device_id) + ).result(timeout=5) + else: + cam_info = asyncio.run(camera_service.get_camera_info(device_id)) + return camera_service.format_display_name(device_id, cam_info) + except Exception: + return device_id + + +@tool +def query_alarm_stats(time_range: str = "today", alarm_type: str = "all") -> str: + """查询告警统计数据(总数、按类型分布、按状态分布) + + Args: + time_range: 时间范围 today=今日 week=本周 month=本月 + alarm_type: 告警类型筛选 leave_post/intrusion/illegal_parking/vehicle_congestion/all + """ + from app.services.alarm_event_service import get_alarm_event_service + svc = get_alarm_event_service() + + start, range_label = _parse_time_range(time_range) + now = beijing_now() + + alarm_type_filter = None if alarm_type == "all" else alarm_type + + alarms, total = svc.get_alarms( + alarm_type=alarm_type_filter, + start_time=start, + end_time=now, + page=1, + page_size=10000, + ) + + type_count = {} + status_count = {"NEW": 0, "CONFIRMED": 0, "FALSE": 0, "CLOSED": 0} + for a in alarms: + type_count[a.alarm_type] = type_count.get(a.alarm_type, 0) + 1 + if a.alarm_status in status_count: + status_count[a.alarm_status] += 1 + + result = { + "range": range_label, + "total": total, + "by_type": {ALARM_TYPE_NAMES.get(t, t): c for t, c in type_count.items()}, + "by_status": {ALARM_STATUS_NAMES.get(s, s): c for s, c in status_count.items()}, + } + return json.dumps(result, ensure_ascii=False) + + +@tool +def list_alarms( + time_range: str = "today", + alarm_type: str = "all", + alarm_status: str = "", + limit: int = 10, +) -> str: + """查询告警列表,返回最近的告警记录(含ID、类型、摄像头、状态、时间) + + Args: + time_range: 时间范围 today/week/month + alarm_type: 告警类型筛选 leave_post/intrusion/illegal_parking/vehicle_congestion/all + alarm_status: 告警状态筛选 NEW=待处理 CONFIRMED=处理中 FALSE=误报 CLOSED=已关闭 + limit: 返回条数,默认10,最多20 + """ + from app.services.alarm_event_service import get_alarm_event_service + svc = get_alarm_event_service() + + start, range_label = _parse_time_range(time_range) + now = beijing_now() + + alarm_type_filter = None if alarm_type == "all" else alarm_type + status_filter = alarm_status if alarm_status else None + limit = min(limit, 20) + + alarms, total = svc.get_alarms( + alarm_type=alarm_type_filter, + alarm_status=status_filter, + start_time=start, + end_time=now, + page=1, + page_size=limit, + ) + + items = [] + for a in alarms: + cam_name = _get_camera_display_name(a.device_id) + event_time = "" + if a.event_time: + try: + event_time = a.event_time.strftime("%m-%d %H:%M") + except Exception: + event_time = str(a.event_time)[:16] + items.append({ + "alarm_id": a.alarm_id, + "type": ALARM_TYPE_NAMES.get(a.alarm_type, a.alarm_type), + "camera": cam_name, + "status": ALARM_STATUS_NAMES.get(a.alarm_status, a.alarm_status), + "level": ALARM_LEVEL_NAMES.get(a.alarm_level, "普通"), + "time": event_time, + }) + + result = {"range": range_label, "total": total, "items": items} + return json.dumps(result, ensure_ascii=False) + + +@tool +def get_alarm_detail(alarm_id: str, config: RunnableConfig) -> str: + """查询单条告警的详细信息(含扩展信息和AI分析结果) + + Args: + alarm_id: 告警ID(如 edge_xxx 或 ALM_xxx) + """ + from app.services.alarm_event_service import get_alarm_event_service + svc = get_alarm_event_service() + + detail = svc.get_alarm(alarm_id) + if not detail: + return json.dumps({"error": f"未找到告警: {alarm_id}"}, ensure_ascii=False) + + snapshot_url = detail.get("snapshot_url", "") + + result = { + "alarm_id": detail.get("alarm_id"), + "alarm_type": ALARM_TYPE_NAMES.get(detail.get("alarm_type", ""), detail.get("alarm_type", "")), + "device_id": detail.get("device_id"), + "alarm_status": ALARM_STATUS_NAMES.get(detail.get("alarm_status", ""), detail.get("alarm_status", "")), + "alarm_level": ALARM_LEVEL_NAMES.get(detail.get("alarm_level"), "普通"), + "event_time": str(detail.get("event_time", ""))[:19], + "handle_status": detail.get("handle_status"), + "handler": detail.get("handler"), + "has_snapshot": bool(snapshot_url), + "snapshot_url": snapshot_url, + } + + # 摄像头名称 + result["camera_name"] = _get_camera_display_name(detail.get("device_id", "")) + + # LLM 分析 + analyses = detail.get("llm_analyses", []) + if analyses: + latest = analyses[-1] + result["ai_analysis"] = latest.get("summary", "") + + return json.dumps(result, ensure_ascii=False) diff --git a/app/services/agent/tools/camera_tools.py b/app/services/agent/tools/camera_tools.py new file mode 100644 index 0000000..5865566 --- /dev/null +++ b/app/services/agent/tools/camera_tools.py @@ -0,0 +1,21 @@ +""" +摄像头查询工具 +""" + +import json +from langchain_core.tools import tool + +from .alarm_query import _get_camera_display_name + + +@tool +def query_camera(camera_id: str) -> str: + """查询摄像头信息(名称、位置、状态等) + + Args: + camera_id: 摄像头ID(device_id) + """ + name = _get_camera_display_name(camera_id) + if name and name != camera_id: + return json.dumps({"camera_id": camera_id, "name": name}, ensure_ascii=False) + return json.dumps({"error": f"未找到摄像头: {camera_id}"}, ensure_ascii=False) diff --git a/app/services/agent/tools/order_tools.py b/app/services/agent/tools/order_tools.py new file mode 100644 index 0000000..efd551e --- /dev/null +++ b/app/services/agent/tools/order_tools.py @@ -0,0 +1,149 @@ +""" +工单工具:查询我的工单、提交处理结果 +""" + +import json +from typing import List, Optional +from langchain_core.tools import tool +from langchain_core.runnables import RunnableConfig + +from app.utils.logger import logger +from .alarm_query import ALARM_TYPE_NAMES, _get_camera_display_name +from .alarm_action import _get_order_id_for_alarm + + +@tool +def list_my_orders(config: RunnableConfig, status: str = "HANDLING") -> str: + """查询我的待处理工单列表 + + Args: + status: 工单状态筛选 HANDLING=处理中 ALL=全部 + """ + from app.services.alarm_event_service import get_alarm_event_service + svc = get_alarm_event_service() + + user_id = config.get("configurable", {}).get("user_id", "") + + alarms, total = svc.get_alarms( + alarm_status="CONFIRMED", + page=1, + page_size=20, + ) + + my_alarms = [a for a in alarms if a.handler == user_id] + + if not my_alarms: + return json.dumps({"total": 0, "items": [], "message": "当前没有待处理的工单"}, ensure_ascii=False) + + items = [] + for a in my_alarms: + cam_name = _get_camera_display_name(a.device_id) + event_time = "" + if a.event_time: + try: + event_time = a.event_time.strftime("%m-%d %H:%M") + except Exception: + event_time = str(a.event_time)[:16] + items.append({ + "alarm_id": a.alarm_id, + "type": ALARM_TYPE_NAMES.get(a.alarm_type, a.alarm_type), + "camera": cam_name, + "time": event_time, + }) + + return json.dumps({"total": len(my_alarms), "items": items}, ensure_ascii=False) + + +@tool +async def submit_order_result( + alarm_id: str, + result_text: str, + config: RunnableConfig, + image_urls: Optional[List[str]] = None, +) -> str: + """提交工单处理结果(文字描述+处理后照片URL) + + Args: + alarm_id: 关联的告警ID + result_text: 处理结果描述 + image_urls: 处理后照片URL列表(COS永久URL) + """ + from app.services.alarm_event_service import get_alarm_event_service + from app.services.wechat_service import get_wechat_service + + user_id = config.get("configurable", {}).get("user_id", "") + svc = get_alarm_event_service() + wechat = get_wechat_service() + + if image_urls is None: + image_urls = [] + + # 检查告警是否存在 + detail = svc.get_alarm(alarm_id) + if not detail: + return json.dumps({"error": f"未找到告警: {alarm_id}"}, ensure_ascii=False) + + # 合并 session 中暂存的图片 + from app.services.session_manager import get_session_manager + session = get_session_manager().get(user_id) + if session.pending_images: + image_urls = session.pending_images + image_urls + session.pending_images = [] + + # 获取关联工单ID + order_id = _get_order_id_for_alarm(alarm_id) + + from app.services.work_order_client import get_work_order_client + wo_client = get_work_order_client() + + remark = f"企微Agent结单: {result_text}" + if image_urls: + remark += f" (附{len(image_urls)}张图片)" + + if order_id and wo_client.enabled: + if not await wo_client.submit_order( + order_id, + result=f"{result_text} by {user_id}", + result_img_urls=image_urls or None, + ): + remark += "(IoT降级)" + + svc.handle_alarm(alarm_id=alarm_id, alarm_status="CLOSED", + handle_status="DONE", handler=user_id, remark=remark) + + # 持久化处理结果图片到 alarm_event_ext + if image_urls: + try: + from app.models import get_session as get_db_session, AlarmEventExt + db = get_db_session() + try: + ext = AlarmEventExt( + alarm_id=alarm_id, + ext_type="HANDLER_RESULT", + ext_data={"result_text": result_text, "image_urls": image_urls, "handler": user_id}, + ) + db.add(ext) + db.commit() + except Exception as e: + db.rollback() + logger.error(f"持久化处理结果图片失败: {e}") + finally: + db.close() + except Exception as e: + logger.error(f"保存处理结果失败: {e}") + + # 更新卡片到终态 + response_code = wechat.get_response_code(alarm_id) + if response_code: + await wechat.update_alarm_card_terminal( + response_code=response_code, user_ids=[user_id], + alarm_id=alarm_id, action="complete", operator_name=user_id, + ) + + result = { + "success": True, + "message": f"工单已提交: {alarm_id}", + "result": result_text, + "images_count": len(image_urls), + } + return json.dumps(result, ensure_ascii=False) diff --git a/app/services/agent_dispatcher.py b/app/services/agent_dispatcher.py index cf34afc..ecae1a9 100644 --- a/app/services/agent_dispatcher.py +++ b/app/services/agent_dispatcher.py @@ -1,19 +1,17 @@ """ -交互Agent调度器(Function Calling 版) +交互Agent调度器 -使用 qwen3.5-plus 原生 Function Calling 处理用户意图,支持: -- 告警统计/列表/详情查询 -- 告警状态更新(确认/忽略/完成/误报) -- 工单列表和结单提交(含图片) -- 摄像头信息查询 -- 图片分析(VLM) +支持两种模式(通过 USE_LANGGRAPH 环境变量切换): +- LangGraph 模式(默认):基于 StateGraph 的 ReAct agent +- Legacy 模式:手写 Function Calling 循环(向后兼容) -多轮对话上下文(每用户独立,10轮,10分钟TTL) +企微入口适配层:处理图片上传、VLM分析等企微特有逻辑, +核心对话由 LangGraph 图处理。 """ import json +import os import time -from datetime import timedelta from typing import Dict, List, Optional from openai import AsyncOpenAI @@ -23,233 +21,55 @@ from app.services.session_manager import get_session_manager from app.utils.logger import logger from app.utils.timezone import beijing_now -SYSTEM_PROMPT = """你是VSP安防AI助手,通过企业微信协助安保人员处理告警和工单。 - -能力: -1. 查询告警统计和明细(query_alarm_stats / list_alarms / get_alarm_detail) -2. 处理告警(update_alarm_status:确认接单、忽略、完成、误报) -3. 提交工单处理结果(submit_order_result:含文字描述和处理后照片) -4. 查询待处理工单(list_my_orders) -5. 查询摄像头信息(query_camera) - -交互规则: -- 用户发图片时,如果有待处理工单,询问是否作为处理结果上传 -- 用户说"处理完了"并附带描述,自动提交结单 -- 回复简洁,适合手机阅读 -- 重要信息用【】标注 -- 禁止使用markdown语法(如![](url)、**加粗**、# 标题),企微聊天不支持 -- 告警截图会自动发送图片消息,文字回复中不要包含图片链接""" - -IMAGE_ANALYZE_PROMPT = """你是物业安防图片分析员。分析这张图片,判断是否存在安全隐患或需要上报的情况。 - -需要关注的异常包括: -- 岗位无人值守(前台、监控室、门岗等应有人但没人) -- 人员入侵(非授权区域出现人员) -- 车辆违停(禁停区域有车辆) -- 消防隐患(灭火器缺失、通道堵塞、线路杂乱) -- 设施损坏(门窗破损、设备故障) -- 物品遗留(可疑包裹、危险物品) - -请用JSON格式回复: -{"has_anomaly": true/false, "description": "异常描述", "alarm_type": "告警类型(leave_post/intrusion/illegal_parking/fire/damage/other/none)"} -只输出JSON,不要其他内容。""" - -# ==================== 7 个工具定义(OpenAI tools 格式)==================== - -TOOLS = [ - { - "type": "function", - "function": { - "name": "query_alarm_stats", - "description": "查询告警统计数据(总数、按类型分布、按状态分布)", - "parameters": { - "type": "object", - "properties": { - "time_range": { - "type": "string", - "enum": ["today", "week", "month"], - "description": "时间范围:today=今日,week=本周,month=本月", - }, - "alarm_type": { - "type": "string", - "enum": ["leave_post", "intrusion", "illegal_parking", "vehicle_congestion", "all"], - "description": "告警类型筛选,all=全部", - }, - }, - "required": [], - }, - }, - }, - { - "type": "function", - "function": { - "name": "list_alarms", - "description": "查询告警列表,返回最近的告警记录(含ID、类型、摄像头、状态、时间)", - "parameters": { - "type": "object", - "properties": { - "time_range": { - "type": "string", - "enum": ["today", "week", "month"], - "description": "时间范围", - }, - "alarm_type": { - "type": "string", - "enum": ["leave_post", "intrusion", "illegal_parking", "vehicle_congestion", "all"], - "description": "告警类型筛选", - }, - "alarm_status": { - "type": "string", - "enum": ["NEW", "CONFIRMED", "FALSE", "CLOSED"], - "description": "告警状态筛选:NEW=待处理,CONFIRMED=处理中,FALSE=误报,CLOSED=已关闭", - }, - "limit": { - "type": "integer", - "description": "返回条数,默认10,最多20", - }, - }, - "required": [], - }, - }, - }, - { - "type": "function", - "function": { - "name": "get_alarm_detail", - "description": "查询单条告警的详细信息(含扩展信息和AI分析结果)", - "parameters": { - "type": "object", - "properties": { - "alarm_id": { - "type": "string", - "description": "告警ID(如 edge_xxx 或 ALM_xxx)", - }, - }, - "required": ["alarm_id"], - }, - }, - }, - { - "type": "function", - "function": { - "name": "update_alarm_status", - "description": "更新告警状态:确认接单(confirm)、忽略误报(ignore)、处理完成(complete)、标记误报(false)", - "parameters": { - "type": "object", - "properties": { - "alarm_id": { - "type": "string", - "description": "告警ID", - }, - "action": { - "type": "string", - "enum": ["confirm", "ignore", "complete", "false"], - "description": "操作:confirm=确认接单,ignore=忽略,complete=处理完成,false=标记误报", - }, - }, - "required": ["alarm_id", "action"], - }, - }, - }, - { - "type": "function", - "function": { - "name": "list_my_orders", - "description": "查询我的待处理工单列表", - "parameters": { - "type": "object", - "properties": { - "status": { - "type": "string", - "enum": ["HANDLING", "ALL"], - "description": "工单状态筛选,默认 HANDLING(处理中)", - }, - }, - "required": [], - }, - }, - }, - { - "type": "function", - "function": { - "name": "submit_order_result", - "description": "提交工单处理结果(文字描述+处理后照片URL)", - "parameters": { - "type": "object", - "properties": { - "alarm_id": { - "type": "string", - "description": "关联的告警ID", - }, - "result_text": { - "type": "string", - "description": "处理结果描述", - }, - "image_urls": { - "type": "array", - "items": {"type": "string"}, - "description": "处理后照片URL列表(COS永久URL)", - }, - }, - "required": ["alarm_id", "result_text"], - }, - }, - }, - { - "type": "function", - "function": { - "name": "query_camera", - "description": "查询摄像头信息(名称、位置、状态等)", - "parameters": { - "type": "object", - "properties": { - "camera_id": { - "type": "string", - "description": "摄像头ID(device_id)", - }, - }, - "required": ["camera_id"], - }, - }, - }, -] - -# 告警类型中文映射 -ALARM_TYPE_NAMES = { - "leave_post": "人员离岗", "intrusion": "周界入侵", - "illegal_parking": "车辆违停", "vehicle_congestion": "车辆拥堵", -} -ALARM_LEVEL_NAMES = {0: "紧急", 1: "重要", 2: "普通", 3: "轻微"} -ALARM_STATUS_NAMES = { - "NEW": "待处理", "CONFIRMED": "处理中", - "FALSE": "误报", "CLOSED": "已关闭", -} +# LangGraph 模式开关 +USE_LANGGRAPH = os.getenv("USE_LANGGRAPH", "true").lower() in ("true", "1", "yes") class AgentDispatcher: - """交互Agent调度器(Function Calling,单例)""" + """交互Agent调度器(单例)""" def __init__(self): - self._client: Optional[AsyncOpenAI] = None self._vlm_client: Optional[AsyncOpenAI] = None self._enabled = False - # 临时存储:工具执行期间需要发送的图片 URL - self._pending_images: Dict[str, List[str]] = {} # user_id -> [image_urls] + # LangGraph 图实例 + self._graph = None + # Legacy 模式客户端 + self._legacy_client: Optional[AsyncOpenAI] = None + # 待发送图片队列(两种模式共用) + self._pending_images: Dict[str, List[str]] = {} def init(self, config): """初始化Agent""" self._enabled = config.enabled and bool(config.vlm_api_key) - if self._enabled: - # FC 文本模型客户端 - self._client = AsyncOpenAI( - api_key=config.vlm_api_key, - base_url=config.vlm_base_url, - ) - # VLM 视觉模型客户端(共享 base_url,模型不同) - self._vlm_client = self._client - logger.info(f"交互Agent已启用: fc_model={config.model}, vlm_model={config.vlm_model}") - else: + if not self._enabled: logger.info("交互Agent未启用(AGENT_ENABLED=false 或缺少 API Key)") + return + + # VLM 客户端(图片分析,两种模式共用) + self._vlm_client = AsyncOpenAI( + api_key=config.vlm_api_key, + base_url=config.vlm_base_url, + ) + + if USE_LANGGRAPH: + try: + from app.services.agent.graph import create_default_graph + self._graph = create_default_graph() + logger.info(f"交互Agent已启用(LangGraph模式): model={config.model}") + except Exception as e: + logger.error(f"LangGraph 初始化失败,降级到Legacy模式: {e}", exc_info=True) + self._init_legacy(config) + else: + self._init_legacy(config) + + def _init_legacy(self, config): + """初始化 Legacy 模式""" + self._legacy_client = AsyncOpenAI( + api_key=config.vlm_api_key, + base_url=config.vlm_base_url, + ) + self._graph = None + logger.info(f"交互Agent已启用(Legacy模式): model={config.model}") @property def enabled(self) -> bool: @@ -262,28 +82,26 @@ class AgentDispatcher: if not self._enabled: return "AI助手未启用,请联系管理员配置。" - session = get_session_manager().get(user_id) - session.add_history("user", content) - - # 清空待发图片队列 self._pending_images[user_id] = [] try: - reply = await self._chat_with_tools(session, user_id) + if self._graph: + reply = await self._langgraph_chat(user_id, content) + else: + reply = await self._legacy_chat(user_id, content) except Exception as e: - logger.error(f"Agent FC 对话失败: {e}", exc_info=True) + logger.error(f"Agent对话失败: {e}", exc_info=True) reply = "抱歉,AI助手暂时无法响应,请稍后重试。" - # 发送工具执行期间收集的图片(如告警截图) + # 发送待发图片 pending = self._pending_images.pop(user_id, []) if pending: await self._send_images_to_user(user_id, pending) - session.add_history("assistant", reply) return reply async def handle_image(self, user_id: str, media_id: str) -> str: - """处理图片消息""" + """处理图片消息(两种模式共用)""" if not self._enabled: return "AI助手未启用,请联系管理员配置。" @@ -312,17 +130,14 @@ class AgentDispatcher: # 3. 检查用户是否有待处理工单 handling_alarm_id = self._find_handling_alarm(user_id) if handling_alarm_id: - # 有待处理工单 → 暂存图片,提示用户确认 session.pending_images.append(permanent_url) - session.add_history("user", "[用户上传了一张图片]") reply = f"收到图片,是否作为【告警 {handling_alarm_id[:20]}...】的处理结果提交?\n回复「是」确认提交,或继续发送更多图片。" - session.add_history("assistant", reply) session.pending_alarm_id = handling_alarm_id return reply - # 4. 无待处理工单 → VLM 分析图片内容 - session.add_history("user", "[用户上传了一张图片]") - analysis = await self._analyze_image(presign_url) + # 4. VLM 分析 + from app.services.agent.prompts import IMAGE_ANALYZE_PROMPT + analysis = await self._analyze_image(presign_url, IMAGE_ANALYZE_PROMPT) if analysis.get("has_anomaly"): desc = analysis.get("description", "异常情况") @@ -330,19 +145,74 @@ class AgentDispatcher: else: reply = "未检测到明显安全隐患。如有疑问请描述情况。" - session.add_history("assistant", reply) return reply - # ==================== Function Calling 核心循环 ==================== + # ==================== LangGraph 模式 ==================== + + async def _langgraph_chat(self, user_id: str, content: str) -> str: + """LangGraph 图调用""" + config = { + "configurable": { + "thread_id": f"wechat-{user_id}", + "user_id": user_id, + } + } + + result = await self._graph.ainvoke( + { + "messages": [{"role": "user", "content": content}], + "user_id": user_id, + "pending_images": [], + "user_uploaded_images": [], + }, + config=config, + ) + + # 从工具返回中提取截图 URL(get_alarm_detail 返回的 snapshot_url) + self._extract_pending_images(user_id, result) + + # 获取最终回复 + last_msg = result["messages"][-1] + reply = last_msg.content if hasattr(last_msg, "content") else str(last_msg) + return reply.strip() if reply else "处理完成" + + def _extract_pending_images(self, user_id: str, result): + """从 LangGraph 结果中提取需要发送的截图""" + for msg in result.get("messages", []): + # ToolMessage 的 content 可能包含 snapshot_url + if hasattr(msg, "type") and msg.type == "tool" and msg.name == "get_alarm_detail": + try: + data = json.loads(msg.content) if isinstance(msg.content, str) else msg.content + url = data.get("snapshot_url", "") + if url: + if user_id not in self._pending_images: + self._pending_images[user_id] = [] + self._pending_images[user_id].append(url) + except Exception: + pass + + # ==================== Legacy 模式(保留向后兼容)==================== + + async def _legacy_chat(self, user_id: str, content: str) -> str: + """Legacy FC 循环(原有逻辑)""" + from app.services.agent.prompts import SYSTEM_PROMPT + + session = get_session_manager().get(user_id) + session.add_history("user", content) - async def _chat_with_tools(self, session, user_id: str) -> str: - """带工具调用的多轮对话""" messages = [{"role": "system", "content": SYSTEM_PROMPT}] messages.extend(session.get_history_for_vlm()) + # 导入旧版工具定义 + from app.services.agent.tools.alarm_query import ( + ALARM_TYPE_NAMES, ALARM_LEVEL_NAMES, ALARM_STATUS_NAMES, + ) + + TOOLS = self._get_legacy_tools() + max_rounds = 5 for _ in range(max_rounds): - resp = await self._client.chat.completions.create( + resp = await self._legacy_client.chat.completions.create( model=settings.agent.model, messages=messages, tools=TOOLS, @@ -351,422 +221,87 @@ class AgentDispatcher: choice = resp.choices[0] if choice.finish_reason == "stop": - return (choice.message.content or "").strip() + reply = (choice.message.content or "").strip() + session.add_history("assistant", reply) + return reply if choice.message.tool_calls: - # 追加 assistant 消息(含 tool_calls) messages.append(choice.message) - for tc in choice.message.tool_calls: try: args = json.loads(tc.function.arguments) if tc.function.arguments else {} except json.JSONDecodeError: args = {} - - result = await self._execute_tool(tc.function.name, args, user_id) + result = await self._legacy_execute_tool(tc.function.name, args, user_id) messages.append({ "role": "tool", "tool_call_id": tc.id, "content": json.dumps(result, ensure_ascii=False), }) else: - # 无 tool_calls 且非 stop(可能是 length 等),返回已有内容 - return (choice.message.content or "处理超时,请重试").strip() + reply = (choice.message.content or "处理超时,请重试").strip() + session.add_history("assistant", reply) + return reply - return "处理超时,请重试" + reply = "处理超时,请重试" + session.add_history("assistant", reply) + return reply - async def _execute_tool(self, name: str, args: dict, user_id: str) -> dict: - """分发执行工具""" + async def _legacy_execute_tool(self, name: str, args: dict, user_id: str) -> dict: + """Legacy 工具执行(复用新的 @tool 函数)""" try: - if name == "query_alarm_stats": - return await self._tool_query_alarm_stats(args) - elif name == "list_alarms": - return await self._tool_list_alarms(args) - elif name == "get_alarm_detail": - return await self._tool_get_alarm_detail(args, user_id) - elif name == "update_alarm_status": - return await self._tool_update_alarm_status(args, user_id) - elif name == "list_my_orders": - return await self._tool_list_my_orders(args, user_id) - elif name == "submit_order_result": - return await self._tool_submit_order_result(args, user_id) - elif name == "query_camera": - return await self._tool_query_camera(args) - else: + from langchain_core.runnables import RunnableConfig + config = RunnableConfig(configurable={"user_id": user_id}) + + from app.services.agent.tools import all_tools + tool_map = {t.name: t for t in all_tools} + + tool_fn = tool_map.get(name) + if not tool_fn: return {"error": f"未知工具: {name}"} + + # 注入 config + args["config"] = config + result_str = await tool_fn.ainvoke(args) + return json.loads(result_str) if isinstance(result_str, str) else result_str except Exception as e: - logger.error(f"工具执行失败: {name}, error={e}", exc_info=True) + logger.error(f"Legacy工具执行失败: {name}, error={e}", exc_info=True) return {"error": f"执行失败: {str(e)}"} - # ==================== 工具实现 ==================== - - async def _tool_query_alarm_stats(self, args: dict) -> dict: - """告警统计""" - from app.services.alarm_event_service import get_alarm_event_service - svc = get_alarm_event_service() - - time_range = args.get("time_range", "today") - start, range_label = self._parse_time_range(time_range) - now = beijing_now() - - alarm_type_filter = args.get("alarm_type") - if alarm_type_filter == "all": - alarm_type_filter = None - - alarms, total = svc.get_alarms( - alarm_type=alarm_type_filter, - start_time=start, - end_time=now, - page=1, - page_size=10000, - ) - - type_count = {} - status_count = {"NEW": 0, "CONFIRMED": 0, "FALSE": 0, "CLOSED": 0} - for a in alarms: - type_count[a.alarm_type] = type_count.get(a.alarm_type, 0) + 1 - if a.alarm_status in status_count: - status_count[a.alarm_status] += 1 - - return { - "range": range_label, - "total": total, - "by_type": {ALARM_TYPE_NAMES.get(t, t): c for t, c in type_count.items()}, - "by_status": {ALARM_STATUS_NAMES.get(s, s): c for s, c in status_count.items()}, - } - - async def _tool_list_alarms(self, args: dict) -> dict: - """告警列表""" - from app.services.alarm_event_service import get_alarm_event_service - from app.services.camera_name_service import get_camera_name_service - svc = get_alarm_event_service() - camera_service = get_camera_name_service() - - time_range = args.get("time_range", "today") - start, range_label = self._parse_time_range(time_range) - now = beijing_now() - - alarm_type_filter = args.get("alarm_type") - if alarm_type_filter == "all": - alarm_type_filter = None - - alarm_status_filter = args.get("alarm_status") - limit = min(args.get("limit", 10), 20) - - alarms, total = svc.get_alarms( - alarm_type=alarm_type_filter, - alarm_status=alarm_status_filter, - start_time=start, - end_time=now, - page=1, - page_size=limit, - ) - - items = [] - for a in alarms: - cam_name = a.device_id - try: - cam_info = await camera_service.get_camera_info(a.device_id) - cam_name = camera_service.format_display_name(a.device_id, cam_info) - except Exception: - pass - event_time = "" - if a.event_time: - try: - event_time = a.event_time.strftime("%m-%d %H:%M") - except Exception: - event_time = str(a.event_time)[:16] - items.append({ - "alarm_id": a.alarm_id, - "type": ALARM_TYPE_NAMES.get(a.alarm_type, a.alarm_type), - "camera": cam_name, - "status": ALARM_STATUS_NAMES.get(a.alarm_status, a.alarm_status), - "level": ALARM_LEVEL_NAMES.get(a.alarm_level, "普通"), - "time": event_time, + @staticmethod + def _get_legacy_tools(): + """获取 Legacy 模式的 OpenAI tools 定义""" + from app.services.agent.tools import all_tools + # 从 @tool 函数自动生成 OpenAI tools 格式 + result = [] + for t in all_tools: + schema = t.args_schema.schema() if t.args_schema else {"type": "object", "properties": {}} + # 移除 config 参数(LLM 不需要知道) + props = {k: v for k, v in schema.get("properties", {}).items() if k != "config"} + required = [r for r in schema.get("required", []) if r != "config"] + result.append({ + "type": "function", + "function": { + "name": t.name, + "description": t.description or "", + "parameters": { + "type": "object", + "properties": props, + "required": required, + }, + }, }) - - return {"range": range_label, "total": total, "items": items} - - async def _tool_get_alarm_detail(self, args: dict, user_id: str) -> dict: - """告警详情""" - from app.services.alarm_event_service import get_alarm_event_service - svc = get_alarm_event_service() - - alarm_id = args.get("alarm_id", "") - detail = svc.get_alarm(alarm_id) - if not detail: - return {"error": f"未找到告警: {alarm_id}"} - - # 截图:加入待发图片队列,由 handle_message 统一发送 - snapshot_url = detail.get("snapshot_url", "") - if snapshot_url: - if user_id not in self._pending_images: - self._pending_images[user_id] = [] - self._pending_images[user_id].append(snapshot_url) - - result = { - "alarm_id": detail.get("alarm_id"), - "alarm_type": ALARM_TYPE_NAMES.get(detail.get("alarm_type", ""), detail.get("alarm_type", "")), - "device_id": detail.get("device_id"), - "alarm_status": ALARM_STATUS_NAMES.get(detail.get("alarm_status", ""), detail.get("alarm_status", "")), - "alarm_level": ALARM_LEVEL_NAMES.get(detail.get("alarm_level"), "普通"), - "event_time": str(detail.get("event_time", ""))[:19], - "handle_status": detail.get("handle_status"), - "handler": detail.get("handler"), - "has_snapshot": bool(snapshot_url), - } - - # 摄像头名称 - try: - from app.services.camera_name_service import get_camera_name_service - camera_service = get_camera_name_service() - cam_info = await camera_service.get_camera_info(detail.get("device_id", "")) - result["camera_name"] = camera_service.format_display_name(detail.get("device_id", ""), cam_info) - except Exception: - result["camera_name"] = detail.get("device_id", "") - - # LLM 分析 - analyses = detail.get("llm_analyses", []) - if analyses: - latest = analyses[-1] - result["ai_analysis"] = latest.get("summary", "") - return result - async def _tool_update_alarm_status(self, args: dict, user_id: str) -> dict: - """更新告警状态(复用卡片按钮的处理逻辑)""" - from app.services.alarm_event_service import get_alarm_event_service - from app.services.wechat_service import get_wechat_service + # ==================== 共用方法 ==================== - alarm_id = args.get("alarm_id", "") - action = args.get("action", "") - - svc = get_alarm_event_service() - wechat = get_wechat_service() - - # 查告警是否存在 - detail = svc.get_alarm(alarm_id) - if not detail: - return {"error": f"未找到告警: {alarm_id}"} - - # 获取关联工单ID - order_id = self._get_order_id_for_alarm(alarm_id) - - # 执行操作 - from app.services.work_order_client import get_work_order_client - wo_client = get_work_order_client() - - if action == "confirm": - remark = "企微Agent确认接单" - if order_id and wo_client.enabled: - if not await wo_client.confirm_order(order_id): - remark += "(IoT降级)" - svc.handle_alarm(alarm_id=alarm_id, alarm_status="CONFIRMED", - handle_status="HANDLING", handler=user_id, remark=remark) - # 更新卡片到 step2 - response_code = wechat.get_response_code(alarm_id) - if response_code: - await wechat.update_alarm_card_step2( - response_code=response_code, user_ids=[user_id], - alarm_id=alarm_id, operator_name=user_id, - ) - return {"success": True, "message": f"已确认接单: {alarm_id}"} - - elif action == "ignore": - remark = "企微Agent忽略" - if order_id and wo_client.enabled: - if not await wo_client.false_alarm(order_id): - remark += "(IoT降级)" - svc.handle_alarm(alarm_id=alarm_id, alarm_status="FALSE", - handle_status="IGNORED", handler=user_id, remark=remark) - response_code = wechat.get_response_code(alarm_id) - if response_code: - await wechat.update_alarm_card_terminal( - response_code=response_code, user_ids=[user_id], - alarm_id=alarm_id, action="ignore", operator_name=user_id, - ) - return {"success": True, "message": f"已忽略: {alarm_id}"} - - elif action == "complete": - remark = "企微Agent已处理" - if order_id and wo_client.enabled: - if not await wo_client.submit_order(order_id, result=f"已处理 by {user_id}"): - remark += "(IoT降级)" - svc.handle_alarm(alarm_id=alarm_id, alarm_status="CLOSED", - handle_status="DONE", handler=user_id, remark=remark) - response_code = wechat.get_response_code(alarm_id) - if response_code: - await wechat.update_alarm_card_terminal( - response_code=response_code, user_ids=[user_id], - alarm_id=alarm_id, action="complete", operator_name=user_id, - ) - return {"success": True, "message": f"已处理完成: {alarm_id}"} - - elif action == "false": - remark = "企微Agent标记误报" - if order_id and wo_client.enabled: - if not await wo_client.false_alarm(order_id): - remark += "(IoT降级)" - svc.handle_alarm(alarm_id=alarm_id, alarm_status="FALSE", - handle_status="IGNORED", handler=user_id, remark=remark) - response_code = wechat.get_response_code(alarm_id) - if response_code: - await wechat.update_alarm_card_terminal( - response_code=response_code, user_ids=[user_id], - alarm_id=alarm_id, action="false", operator_name=user_id, - ) - return {"success": True, "message": f"已标记误报: {alarm_id}"} - - return {"error": f"未知操作: {action}"} - - async def _tool_list_my_orders(self, args: dict, user_id: str) -> dict: - """查询我的待处理工单""" - from app.services.alarm_event_service import get_alarm_event_service - from app.services.camera_name_service import get_camera_name_service - svc = get_alarm_event_service() - camera_service = get_camera_name_service() - - # 查询 handler=user_id & handle_status=HANDLING 的告警 - alarms, total = svc.get_alarms( - alarm_status="CONFIRMED", - page=1, - page_size=20, - ) - - # 过滤属于该用户的 - my_alarms = [a for a in alarms if a.handler == user_id] - - if not my_alarms: - return {"total": 0, "items": [], "message": "当前没有待处理的工单"} - - items = [] - for a in my_alarms: - cam_name = a.device_id - try: - cam_info = await camera_service.get_camera_info(a.device_id) - cam_name = camera_service.format_display_name(a.device_id, cam_info) - except Exception: - pass - event_time = "" - if a.event_time: - try: - event_time = a.event_time.strftime("%m-%d %H:%M") - except Exception: - event_time = str(a.event_time)[:16] - items.append({ - "alarm_id": a.alarm_id, - "type": ALARM_TYPE_NAMES.get(a.alarm_type, a.alarm_type), - "camera": cam_name, - "time": event_time, - }) - - return {"total": len(my_alarms), "items": items} - - async def _tool_submit_order_result(self, args: dict, user_id: str) -> dict: - """提交工单处理结果""" - from app.services.alarm_event_service import get_alarm_event_service - from app.services.wechat_service import get_wechat_service - - alarm_id = args.get("alarm_id", "") - result_text = args.get("result_text", "已处理") - image_urls = args.get("image_urls", []) - - svc = get_alarm_event_service() - wechat = get_wechat_service() - - # 检查告警是否存在 - detail = svc.get_alarm(alarm_id) - if not detail: - return {"error": f"未找到告警: {alarm_id}"} - - # 合并 session 中暂存的图片 - session = get_session_manager().get(user_id) - if session.pending_images: - image_urls = session.pending_images + image_urls - session.pending_images = [] - - # 获取关联工单ID - order_id = self._get_order_id_for_alarm(alarm_id) - - from app.services.work_order_client import get_work_order_client - wo_client = get_work_order_client() - - remark = f"企微Agent结单: {result_text}" - if image_urls: - remark += f" (附{len(image_urls)}张图片)" - - if order_id and wo_client.enabled: - if not await wo_client.submit_order( - order_id, - result=f"{result_text} by {user_id}", - result_img_urls=image_urls or None, - ): - remark += "(IoT降级)" - svc.handle_alarm(alarm_id=alarm_id, alarm_status="CLOSED", - handle_status="DONE", handler=user_id, remark=remark) - - # 持久化处理结果图片到 alarm_event_ext - if image_urls: - try: - from app.models import get_session as get_db_session, AlarmEventExt - db = get_db_session() - try: - ext = AlarmEventExt( - alarm_id=alarm_id, - ext_type="HANDLER_RESULT", - ext_data={"result_text": result_text, "image_urls": image_urls, "handler": user_id}, - ) - db.add(ext) - db.commit() - except Exception as e: - db.rollback() - logger.error(f"持久化处理结果图片失败: {e}") - finally: - db.close() - except Exception as e: - logger.error(f"保存处理结果失败: {e}") - - # 更新卡片到终态 - response_code = wechat.get_response_code(alarm_id) - if response_code: - await wechat.update_alarm_card_terminal( - response_code=response_code, user_ids=[user_id], - alarm_id=alarm_id, action="complete", operator_name=user_id, - ) - - return { - "success": True, - "message": f"工单已提交: {alarm_id}", - "result": result_text, - "images_count": len(image_urls), - } - - async def _tool_query_camera(self, args: dict) -> dict: - """查询摄像头信息""" - from app.services.camera_name_service import get_camera_name_service - camera_service = get_camera_name_service() - - camera_id = args.get("camera_id", "") - try: - cam_info = await camera_service.get_camera_info(camera_id) - if cam_info: - return { - "camera_id": camera_id, - "name": camera_service.format_display_name(camera_id, cam_info), - } - return {"error": f"未找到摄像头: {camera_id}"} - except Exception as e: - return {"error": f"查询失败: {str(e)}"} - - # ==================== VLM 图片分析(保留) ==================== - - async def _analyze_image(self, image_url: str) -> Dict: + async def _analyze_image(self, image_url: str, prompt: str) -> Dict: """VLM 分析图片内容""" try: resp = await self._vlm_client.chat.completions.create( model=settings.agent.vlm_model, messages=[ - {"role": "system", "content": IMAGE_ANALYZE_PROMPT}, + {"role": "system", "content": prompt}, {"role": "user", "content": [ {"type": "image_url", "image_url": {"url": image_url}}, {"type": "text", "text": "请分析这张图片"}, @@ -784,8 +319,6 @@ class AgentDispatcher: logger.error(f"VLM图片分析失败: {e}") return {"has_anomaly": False, "description": "", "alarm_type": ""} - # ==================== 辅助方法 ==================== - async def _send_images_to_user(self, user_id: str, image_urls: List[str]): """通过企微发送图片消息给用户""" from app.services.wechat_service import get_wechat_service @@ -797,7 +330,6 @@ class AgentDispatcher: try: media_id = await wechat.upload_media_from_url(url) if media_id: - # 发送图片消息(复用企微发送图片的能力) access_token = await wechat._get_access_token() import httpx msg = { @@ -812,43 +344,9 @@ class AgentDispatcher: except Exception as e: logger.error(f"发送告警截图失败: user={user_id}, error={e}") - @staticmethod - def _parse_time_range(time_range: str): - """解析时间范围,返回 (start_time, label)""" - now = beijing_now() - if time_range == "week": - start = now - timedelta(days=now.weekday()) - start = start.replace(hour=0, minute=0, second=0, microsecond=0) - return start, "本周" - elif time_range == "month": - start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) - return start, "本月" - else: - start = now.replace(hour=0, minute=0, second=0, microsecond=0) - return start, "今日" - - @staticmethod - def _get_order_id_for_alarm(alarm_id: str) -> str: - """从 alarm_event_ext 中获取关联的工单ID""" - from app.models import get_session, AlarmEventExt - db = get_session() - try: - ext = db.query(AlarmEventExt).filter( - AlarmEventExt.alarm_id == alarm_id, - AlarmEventExt.ext_type == "WORK_ORDER", - ).first() - if ext and ext.ext_data: - return ext.ext_data.get("order_id", "") - return "" - except Exception as e: - logger.error(f"查询工单ID失败: alarm={alarm_id}, error={e}") - return "" - finally: - db.close() - @staticmethod def _find_handling_alarm(user_id: str) -> str: - """查找用户正在处理的告警ID(handle_status=HANDLING & handler=user_id)""" + """查找用户正在处理的告警ID""" from app.models import get_session, AlarmEvent db = get_session() try: diff --git a/requirements.txt b/requirements.txt index 3663e65..f30ac03 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,6 @@ cos-python-sdk-v5>=1.9.30 openai==1.68.0 openpyxl>=3.1.0 pycryptodome>=3.19.0 +langchain-core>=0.3.0 +langchain-openai>=0.3.0 +langgraph>=0.3.0