功能:AgentDispatcher 多模态重写

- 统一使用 VLM 模型处理文字+图片
- 多轮对话上下文(SessionManager)
- 图片分析上报:VLM 分析 → 追问位置 → 创建工单
- 结单图片分析:VLM 确认异常消除 → 自动结单
- 意图识别嵌入对话回复中,不再单独调用
- 所有模型配置走 settings,无硬编码
This commit is contained in:
2026-03-20 11:10:54 +08:00
parent 67f6af638a
commit f87222e6fb

View File

@@ -1,44 +1,61 @@
"""
交互Agent调度器
交互Agent调度器(多模态版)
接收企微用户消息通过LLM识别意图路由到对应处理器。
支持意图:
- create_work_order: 创建工单("帮我创建XX工单"
- query_alarm: 查询告警("今天有多少告警"
- export_report: 导出报表("导出本周告警报表"
- general_chat: 兜底闲聊
统一使用 VLM 模型处理文字+图片,支持:
- 文字意图识别(创建工单/查询告警/导出报表/闲聊)
- 图片分析上报VLM 分析 → 追问位置 → 创建工单)
- 结单图片分析VLM 确认异常消除 → 自动结单
- 多轮对话上下文每用户独立10轮10分钟TTL
"""
import json
import time
from datetime import timedelta
from typing import Dict, Optional
from typing import Dict, List, Optional
from openai import AsyncOpenAI
from app.config import settings
from app.services.session_manager import get_session_manager
from app.utils.logger import logger
from app.utils.timezone import beijing_now
INTENT_SYSTEM_PROMPT = """你是物业安防AI助手。根据用户消息识别意图仅输出JSON。
SYSTEM_PROMPT = """你是物业安防AI助手。你可以:
1. 分析用户上传的现场图片,识别安全隐患或异常情况
2. 帮助创建安保工单(需要位置信息)
3. 查询告警统计数据
4. 导出告警报表
5. 分析处理结果图片,确认异常是否消除
规则:
- 当用户发送图片时,分析图片内容,判断是否有安全隐患
- 如果识别到异常,描述异常并询问具体位置
- 回复简洁专业不超过100字
- 不要编造不存在的信息
当需要识别意图时在回复末尾附加JSON标记
<!--INTENT:{"intent":"...","params":{...}}-->
可选意图:
- create_work_order: 用户要创建工单或上报问题
- query_alarm: 用户要查询告警数据或统计
- export_report: 用户要导出报表或Excel
- general_chat: 其他闲聊或无法识别
- query_alarm: 用户要查询告警数据params: time_range=today/week/month, alarm_type=leave_post/intrusion/all
- export_report: 用户要导出报表params: time_range=today/week/month
- general_chat: 其他对话(无需附加标记)"""
输出格式:{"intent":"...","params":{...}}
IMAGE_ANALYZE_PROMPT = """分析这张图片,判断是否存在安全隐患或异常情况。
请用JSON格式回复
{"has_anomaly": true/false, "description": "异常描述", "alarm_type": "告警类型(fire/intrusion/damage/leak/other/none)"}
只输出JSON不要其他内容。"""
params说明
- create_work_order: {"title":"工单标题","description":"描述","priority":"low/medium/high/urgent"}
- query_alarm: {"time_range":"today/week/month","alarm_type":"leave_post/intrusion/all"}
- export_report: {"time_range":"today/week/month"}
- general_chat: {"message":"友好的回复内容"}"""
CLOSE_ANALYZE_PROMPT = """这是一张处理后的现场照片。请判断之前的异常是否已经消除。
之前的异常是:{previous_issue}
请用JSON格式回复
{"resolved": true/false, "description": "当前状态描述"}
只输出JSON不要其他内容。"""
class AgentDispatcher:
"""交互Agent调度器单例"""
"""交互Agent调度器多模态,单例)"""
def __init__(self):
self._client: Optional[AsyncOpenAI] = None
@@ -46,13 +63,13 @@ class AgentDispatcher:
def init(self, config):
"""初始化Agent"""
self._enabled = config.enabled and bool(config.llm_api_key)
self._enabled = config.enabled and bool(config.vlm_api_key)
if self._enabled:
self._client = AsyncOpenAI(
api_key=config.llm_api_key,
base_url=config.llm_base_url,
api_key=config.vlm_api_key,
base_url=config.vlm_base_url,
)
logger.info(f"交互Agent已启用: model={config.llm_model}")
logger.info(f"交互Agent已启用: model={config.vlm_model}")
else:
logger.info("交互Agent未启用AGENT_ENABLED=false 或缺少 API Key")
@@ -60,90 +77,275 @@ class AgentDispatcher:
def enabled(self) -> bool:
return self._enabled
# ==================== 消息入口 ====================
async def handle_message(self, user_id: str, content: str) -> str:
"""处理用户消息,返回回复文本"""
"""处理文字消息"""
if not self._enabled:
return "AI助手未启用请联系管理员配置。"
# 1. 意图识别
intent_result = await self._classify_intent(content)
intent = intent_result.get("intent", "general_chat")
params = intent_result.get("params", {})
session = get_session_manager().get(user_id)
logger.info(f"Agent意图识别: user={user_id}, intent={intent}, params={params}")
# 状态机:等待位置信息
if session.state == "waiting_location":
return await self._handle_location_reply(user_id, session, content)
# 2. 路由到对应 handler
handlers = {
"create_work_order": self._handle_create_work_order,
"query_alarm": self._handle_query_alarm,
"export_report": self._handle_export_report,
"general_chat": self._handle_general_chat,
}
# 状态机:等待确认
if session.state == "waiting_confirm":
return await self._handle_confirm_reply(user_id, session, content)
handler = handlers.get(intent, self._handle_general_chat)
# 正常对话:带上下文调用 VLM
session.add_history("user", content)
reply = await self._chat(session)
session.add_history("assistant", reply)
# 检查是否有嵌入的意图标记
intent_result = self._extract_intent(reply)
if intent_result:
clean_reply = reply.split("<!--INTENT:")[0].strip()
action_reply = await self._execute_intent(user_id, intent_result)
return f"{clean_reply}\n\n{action_reply}" if clean_reply else action_reply
return reply
async def handle_image(self, user_id: str, media_id: str) -> str:
"""处理图片消息"""
if not self._enabled:
return "AI助手未启用请联系管理员配置。"
session = get_session_manager().get(user_id)
# 1. 下载图片
from app.services.wechat_service import get_wechat_service
wechat = get_wechat_service()
image_data = await wechat.download_media(media_id)
if not image_data:
return "图片下载失败,请重新发送。"
# 2. 上传 COS 持久化
from app.services.oss_storage import get_oss_storage
oss = get_oss_storage()
object_key = f"agent/{user_id}/{int(time.time())}.jpg"
try:
return await handler(user_id, params, content)
oss.upload_file(image_data, object_key, content_type="image/jpeg")
except Exception as e:
logger.error(f"Agent handler异常: intent={intent}, error={e}", exc_info=True)
return "处理请求时出错,请稍后重试"
logger.error(f"Agent图片上传COS失败: {e}")
return "图片保存失败,请重新发送"
image_url = oss.get_presigned_url(object_key)
# 3. 如果在等待结单图片
if session.state == "waiting_close_photo":
return await self._handle_close_photo(user_id, session, image_url, object_key)
# 4. 正常上报VLM 分析图片
session.pending_image_url = object_key
analysis = await self._analyze_image(image_url)
session.pending_analysis = analysis.get("description", "")
session.pending_alarm_type = analysis.get("alarm_type", "other")
session.add_history("user", [
{"type": "text", "text": "[用户上传了一张图片]"},
])
if analysis.get("has_anomaly"):
session.state = "waiting_location"
desc = analysis.get("description", "异常情况")
reply = f"检测到异常:{desc}\n\n请问具体在什么位置A栋3层东侧走廊"
else:
session.state = "waiting_location"
reply = "未检测到明显异常。如确需上报,请告知具体位置和情况描述。"
session.add_history("assistant", reply)
return reply
# ==================== 状态机处理 ====================
async def _handle_location_reply(self, user_id: str, session, location: str) -> str:
"""用户回复位置 → 创建工单"""
session.add_history("user", location)
alarm_type_name = session.pending_alarm_type or "异常情况"
description = session.pending_analysis or "人工上报"
image_url = ""
if session.pending_image_url:
try:
from app.services.oss_storage import get_oss_storage
image_url = get_oss_storage().get_permanent_url(session.pending_image_url)
except Exception:
pass
# 通过 IoT 平台创建工单
try:
from app.services.work_order_client import get_work_order_client
wo_client = get_work_order_client()
if wo_client.enabled:
alarm_id = f"MANUAL_{user_id}_{int(time.time())}"
order_id = await wo_client.create_order(
title=f"{alarm_type_name} - 人工上报",
area_id=0,
alarm_id=alarm_id,
alarm_type=alarm_type_name,
description=f"{description}\n位置:{location}",
priority=2,
trigger_source="人工上报",
camera_id="",
image_url=image_url,
)
session.pending_order_id = order_id or ""
session.reset()
if order_id:
reply = f"工单已创建\n编号:{order_id}\n位置:{location}\n描述:{description}\n\n相关人员将尽快处理。"
else:
reply = "工单创建失败,请联系管理员。"
else:
session.reset()
reply = f"已记录上报信息\n位置:{location}\n描述:{description}\n\n(工单系统未启用,请联系管理员)"
except Exception as e:
logger.error(f"Agent创建工单失败: {e}", exc_info=True)
session.reset()
reply = "工单创建失败,请稍后重试或联系管理员。"
session.add_history("assistant", reply)
return reply
async def _handle_confirm_reply(self, user_id: str, session, content: str) -> str:
"""用户回复确认"""
content_lower = content.strip().lower()
if content_lower in ("", "确认", "", "yes", "y", "ok"):
session.reset()
return "已确认,感谢反馈。"
elif content_lower in ("", "取消", "不是", "no", "n"):
session.reset()
return "已取消。如需帮助请随时联系。"
else:
return "请回复「是」确认或「否」取消。"
async def _handle_close_photo(self, user_id: str, session, image_url: str, object_key: str) -> str:
"""分析结单图片"""
previous_issue = session.pending_analysis or "之前的异常"
async def _classify_intent(self, content: str) -> Dict:
"""LLM意图分类"""
try:
resp = await self._client.chat.completions.create(
model=settings.agent.llm_model,
model=settings.agent.vlm_model,
messages=[
{"role": "system", "content": INTENT_SYSTEM_PROMPT},
{"role": "user", "content": content},
{"role": "system", "content": CLOSE_ANALYZE_PROMPT.format(previous_issue=previous_issue)},
{"role": "user", "content": [
{"type": "image_url", "image_url": {"url": image_url}},
{"type": "text", "text": "请判断异常是否已消除"},
]},
],
timeout=settings.agent.llm_timeout,
timeout=settings.agent.vlm_timeout,
)
text = resp.choices[0].message.content.strip()
if "```" in text:
text = text.split("```")[1]
text = text.split("```")[1].strip()
if text.startswith("json"):
text = text[4:]
text = text.strip()
text = text[4:].strip()
result = json.loads(text)
except Exception as e:
logger.error(f"结单图片分析失败: {e}")
session.reset()
return "图片分析失败,请重新拍照上传,或联系管理员手动结单。"
if result.get("resolved"):
# 自动结单
order_id = session.pending_order_id
if order_id:
try:
from app.services.work_order_client import get_work_order_client
wo_client = get_work_order_client()
await wo_client.auto_complete_order(int(order_id), "处理完成-AI确认异常已消除")
except Exception as e:
logger.error(f"自动结单失败: {e}")
session.reset()
desc = result.get("description", "现场已恢复正常")
return f"确认处理完成:{desc}\n工单已自动关闭。"
else:
desc = result.get("description", "仍有异常")
return f"检测到仍有异常:{desc}\n请继续处理后再次拍照上传。"
# ==================== VLM 调用 ====================
async def _chat(self, session) -> str:
"""带上下文的 VLM 对话"""
try:
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
messages.extend(session.get_history_for_vlm())
resp = await self._client.chat.completions.create(
model=settings.agent.vlm_model,
messages=messages,
timeout=settings.agent.vlm_timeout,
)
return resp.choices[0].message.content.strip()
except Exception as e:
logger.error(f"VLM对话失败: {e}")
return "抱歉AI助手暂时无法响应请稍后重试。"
async def _analyze_image(self, image_url: str) -> Dict:
"""VLM 分析图片内容"""
try:
resp = await self._client.chat.completions.create(
model=settings.agent.vlm_model,
messages=[
{"role": "system", "content": IMAGE_ANALYZE_PROMPT},
{"role": "user", "content": [
{"type": "image_url", "image_url": {"url": image_url}},
{"type": "text", "text": "请分析这张图片"},
]},
],
timeout=settings.agent.vlm_timeout,
)
text = resp.choices[0].message.content.strip()
if "```" in text:
text = text.split("```")[1].strip()
if text.startswith("json"):
text = text[4:].strip()
return json.loads(text)
except Exception as e:
logger.error(f"意图识别失败: {e}")
return {"intent": "general_chat", "params": {"message": "抱歉,我暂时无法理解您的请求。"}}
logger.error(f"VLM图片分析失败: {e}")
return {"has_anomaly": False, "description": "", "alarm_type": ""}
async def _handle_create_work_order(self, user_id: str, params: Dict, raw: str) -> str:
"""创建工单"""
from app.services.work_order_service import get_work_order_service
svc = get_work_order_service()
# ==================== 意图执行 ====================
title = params.get("title", "")
if not title:
title = raw[:50]
def _extract_intent(self, reply: str) -> Optional[Dict]:
"""从回复中提取嵌入的意图标记"""
if "<!--INTENT:" not in reply:
return None
try:
json_str = reply.split("<!--INTENT:")[1].split("-->")[0]
return json.loads(json_str)
except Exception:
return None
order = svc.create_work_order(
title=title,
description=params.get("description", raw),
priority=params.get("priority", "medium"),
assignee_uid=user_id,
)
async def _execute_intent(self, user_id: str, intent_result: Dict) -> str:
"""执行识别到的意图"""
intent = intent_result.get("intent", "")
params = intent_result.get("params", {})
if order:
priority_names = {"low": "", "medium": "", "high": "", "urgent": "紧急"}
p_name = priority_names.get(order.priority.value, "")
return (
f"工单已创建\n"
f"编号:{order.order_no}\n"
f"标题:{order.title}\n"
f"优先级:{p_name}\n"
f"状态:待处理"
)
return "工单创建失败,请稍后重试"
if intent == "query_alarm":
return await self._handle_query_alarm(user_id, params)
elif intent == "export_report":
return await self._handle_export_report(user_id, params)
elif intent == "create_work_order":
# 文字创建工单 → 进入等待位置状态
session = get_session_manager().get(user_id)
session.pending_analysis = params.get("description", "")
session.pending_alarm_type = params.get("title", "人工上报")
session.state = "waiting_location"
return "请提供具体位置A栋3层东侧走廊"
return ""
async def _handle_query_alarm(self, user_id: str, params: Dict, raw: str) -> str:
async def _handle_query_alarm(self, user_id: str, params: Dict) -> str:
"""查询告警统计"""
from app.services.alarm_event_service import get_alarm_event_service
svc = get_alarm_event_service()
# 解析时间范围
time_range = params.get("time_range", "today")
now = beijing_now()
if time_range == "week":
@@ -169,7 +371,6 @@ class AgentDispatcher:
page_size=10000,
)
# 按类型统计
type_count = {}
status_count = {"NEW": 0, "CONFIRMED": 0, "FALSE": 0, "CLOSED": 0}
for a in alarms:
@@ -177,7 +378,10 @@ class AgentDispatcher:
if a.alarm_status in status_count:
status_count[a.alarm_status] += 1
type_names = {"leave_post": "人员离岗", "intrusion": "周界入侵"}
type_names = {
"leave_post": "人员离岗", "intrusion": "周界入侵",
"illegal_parking": "车辆违停", "vehicle_congestion": "车辆拥堵",
}
type_lines = [f" {type_names.get(t, t)}: {c}" for t, c in type_count.items()]
return (
@@ -189,7 +393,7 @@ class AgentDispatcher:
f"误报过滤: {status_count['FALSE']}"
)
async def _handle_export_report(self, user_id: str, params: Dict, raw: str) -> str:
async def _handle_export_report(self, user_id: str, params: Dict) -> str:
"""导出Excel报表"""
from app.services.report_generator import generate_alarm_report
from app.services.oss_storage import get_oss_storage
@@ -199,11 +403,9 @@ class AgentDispatcher:
if not result:
range_names = {"today": "今日", "week": "本周", "month": "本月"}
return f"{range_names.get(time_range, '今日')}暂无告警数据,无法生成报表"
return f"{range_names.get(time_range, '今日')}暂无告警数据。"
filename, file_bytes = result
# 上传到 COS 获取下载链接
oss = get_oss_storage()
try:
object_key = oss.upload_file(
@@ -215,14 +417,7 @@ class AgentDispatcher:
return f"报表已生成\n文件:{filename}\n下载:{download_url}"
except Exception as e:
logger.warning(f"报表上传COS失败: {e}")
return f"报表生成({filename}),但上传失败,请联系管理员。"
async def _handle_general_chat(self, user_id: str, params: Dict, raw: str) -> str:
"""兜底回复"""
msg = params.get("message", "")
if msg:
return msg
return "您好我是安防AI助手。可以帮您\n1. 创建工单\n2. 查询告警统计\n3. 导出告警报表\n\n请直接描述您的需求。"
return f"报表生成成功{filename}),但上传失败,请联系管理员。"
# 全局单例