Files
iot-device-management-service/app/services/agent_dispatcher.py
16337 101a99616e 修复:图片分析prompt增加离岗识别 + 创建工单支持直接带位置
1. IMAGE_ANALYZE_PROMPT 增加"岗位无人值守"等异常类型
2. create_work_order 意图支持直接提取 location,有位置则跳过追问
3. 减少图片+文字同时发送时的消息顺序混乱
2026-03-20 14:11:36 +08:00

596 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
交互Agent调度器多模态版
统一使用 VLM 模型处理文字+图片,支持:
- 文字意图识别(创建工单/查询告警/导出报表/闲聊)
- 图片分析上报VLM 分析 → 追问位置 → 创建工单)
- 结单图片分析VLM 确认异常消除 → 自动结单)
- 多轮对话上下文每用户独立10轮10分钟TTL
"""
import json
import time
from datetime import timedelta
from typing import Dict, List, Optional
from openai import AsyncOpenAI
from app.config import settings
from app.services.session_manager import get_session_manager
from app.utils.logger import logger
from app.utils.timezone import beijing_now
SYSTEM_PROMPT = """你是物业安防AI助手。你可以
1. 分析用户上传的现场图片,识别安全隐患或异常情况
2. 帮助创建安保工单(需要位置信息)
3. 查询告警统计数据
4. 导出告警报表
5. 分析处理结果图片,确认异常是否消除
重要规则:
- 正常跟用户对话,不要把无关闲聊强行关联到工单或告警
- 只有用户明确表达"上报问题""创建工单""发现异常"等意图时才触发工单流程
- 用户随便聊天、问问题时,正常友好回复即可
- 当用户发送图片时,分析图片内容,判断是否有安全隐患
- 回复简洁友好不超过100字
- 不要编造不存在的信息
当且仅当用户明确要执行以下操作时在回复末尾附加JSON标记
<!--INTENT:{"intent":"...","params":{...}}-->
可选意图(仅在用户明确请求时使用):
- create_work_order: 用户明确要创建工单或上报问题
params: {title: 工单标题, description: 问题描述, location: 位置(如果用户提到了)}
- query_alarm: 用户要查询告警统计/概况(如"今天有多少告警""告警统计"
params: {time_range: today/week/month, alarm_type: leave_post/intrusion/all}
- list_alarm: 用户要查看具体告警列表/详情(如"展示今天所有告警""未处理的告警有哪些""今天的离岗告警"
params: {time_range: today/week/month, alarm_type: leave_post/intrusion/all, alarm_status: NEW/CONFIRMED/FALSE/CLOSED}
- export_report: 用户要导出报表params: time_range=today/week/month
- 普通对话不要附加任何标记"""
IMAGE_ANALYZE_PROMPT = """你是物业安防图片分析员。分析这张图片,判断是否存在安全隐患或需要上报的情况。
需要关注的异常包括:
- 岗位无人值守(前台、监控室、门岗等应有人但没人)
- 人员入侵(非授权区域出现人员)
- 车辆违停(禁停区域有车辆)
- 消防隐患(灭火器缺失、通道堵塞、线路杂乱)
- 设施损坏(门窗破损、设备故障)
- 物品遗留(可疑包裹、危险物品)
请用JSON格式回复
{"has_anomaly": true/false, "description": "异常描述", "alarm_type": "告警类型(leave_post/intrusion/illegal_parking/fire/damage/other/none)"}
只输出JSON不要其他内容。"""
CLOSE_ANALYZE_PROMPT = """这是一张处理后的现场照片。请判断之前的异常是否已经消除。
之前的异常是:{previous_issue}
请用JSON格式回复
{"resolved": true/false, "description": "当前状态描述"}
只输出JSON不要其他内容。"""
class AgentDispatcher:
"""交互Agent调度器多模态单例"""
def __init__(self):
self._client: Optional[AsyncOpenAI] = None
self._enabled = False
def init(self, config):
"""初始化Agent"""
self._enabled = config.enabled and bool(config.vlm_api_key)
if self._enabled:
self._client = AsyncOpenAI(
api_key=config.vlm_api_key,
base_url=config.vlm_base_url,
)
logger.info(f"交互Agent已启用: model={config.vlm_model}")
else:
logger.info("交互Agent未启用AGENT_ENABLED=false 或缺少 API Key")
@property
def enabled(self) -> bool:
return self._enabled
# ==================== 消息入口 ====================
async def handle_message(self, user_id: str, content: str) -> str:
"""处理文字消息"""
if not self._enabled:
return "AI助手未启用请联系管理员配置。"
session = get_session_manager().get(user_id)
# 状态机:等待位置信息
if session.state == "waiting_location":
# 判断用户是否真的在回答位置,还是在说别的
if self._looks_like_location(content):
return await self._handle_location_reply(user_id, session, content)
else:
# 不像位置信息,退出状态机回到正常对话
session.reset()
session.add_history("user", content)
reply = await self._chat(session)
session.add_history("assistant", reply)
return reply
# 状态机:等待确认
if session.state == "waiting_confirm":
return await self._handle_confirm_reply(user_id, session, content)
# 正常对话:带上下文调用 VLM
session.add_history("user", content)
reply = await self._chat(session)
session.add_history("assistant", reply)
# 检查是否有嵌入的意图标记
intent_result = self._extract_intent(reply)
if intent_result:
# 有意图标记时,丢弃 VLM 自己编的回复,只用真实数据
action_reply = await self._execute_intent(user_id, intent_result)
if action_reply:
return action_reply
# 意图执行无结果,降级用 VLM 的回复
clean_reply = reply.split("<!--INTENT:")[0].strip()
return clean_reply or reply
return reply
async def handle_image(self, user_id: str, media_id: str) -> str:
"""处理图片消息"""
if not self._enabled:
return "AI助手未启用请联系管理员配置。"
session = get_session_manager().get(user_id)
# 1. 下载图片
from app.services.wechat_service import get_wechat_service
wechat = get_wechat_service()
image_data = await wechat.download_media(media_id)
if not image_data:
return "图片下载失败,请重新发送。"
# 2. 上传 COS 持久化
from app.services.oss_storage import get_oss_storage
oss = get_oss_storage()
object_key = f"agent/{user_id}/{int(time.time())}.jpg"
try:
oss.upload_file(image_data, object_key, content_type="image/jpeg")
except Exception as e:
logger.error(f"Agent图片上传COS失败: {e}")
return "图片保存失败,请重新发送。"
image_url = oss.get_presigned_url(object_key)
# 3. 如果在等待结单图片
if session.state == "waiting_close_photo":
return await self._handle_close_photo(user_id, session, image_url, object_key)
# 4. 正常上报VLM 分析图片
session.pending_image_url = object_key
analysis = await self._analyze_image(image_url)
session.pending_analysis = analysis.get("description", "")
session.pending_alarm_type = analysis.get("alarm_type", "other")
session.add_history("user", [
{"type": "text", "text": "[用户上传了一张图片]"},
])
if analysis.get("has_anomaly"):
session.state = "waiting_location"
desc = analysis.get("description", "异常情况")
reply = f"检测到异常:{desc}\n\n请问具体在什么位置A栋3层东侧走廊"
else:
session.state = "waiting_location"
reply = "未检测到明显异常。如确需上报,请告知具体位置和情况描述。"
session.add_history("assistant", reply)
return reply
# ==================== 状态机处理 ====================
async def _handle_location_reply(self, user_id: str, session, location: str) -> str:
"""用户回复位置 → 创建工单"""
session.add_history("user", location)
alarm_type_name = session.pending_alarm_type or "异常情况"
description = session.pending_analysis or "人工上报"
image_url = ""
if session.pending_image_url:
try:
from app.services.oss_storage import get_oss_storage
image_url = get_oss_storage().get_permanent_url(session.pending_image_url)
except Exception:
pass
# 通过 IoT 平台创建工单
try:
from app.services.work_order_client import get_work_order_client
wo_client = get_work_order_client()
if wo_client.enabled:
alarm_id = f"MANUAL_{user_id}_{int(time.time())}"
order_id = await wo_client.create_order(
title=f"{alarm_type_name} - 人工上报",
area_id=0,
alarm_id=alarm_id,
alarm_type=alarm_type_name,
description=f"{description}\n位置:{location}",
priority=2,
trigger_source="人工上报",
camera_id="",
image_url=image_url,
)
session.pending_order_id = order_id or ""
session.reset()
if order_id:
reply = f"工单已创建\n编号:{order_id}\n位置:{location}\n描述:{description}\n\n相关人员将尽快处理。"
else:
reply = "工单创建失败,请联系管理员。"
else:
session.reset()
reply = f"已记录上报信息\n位置:{location}\n描述:{description}\n\n(工单系统未启用,请联系管理员)"
except Exception as e:
logger.error(f"Agent创建工单失败: {e}", exc_info=True)
session.reset()
reply = "工单创建失败,请稍后重试或联系管理员。"
session.add_history("assistant", reply)
return reply
async def _handle_confirm_reply(self, user_id: str, session, content: str) -> str:
"""用户回复确认"""
content_lower = content.strip().lower()
if content_lower in ("", "确认", "", "yes", "y", "ok"):
session.reset()
return "已确认,感谢反馈。"
elif content_lower in ("", "取消", "不是", "no", "n"):
session.reset()
return "已取消。如需帮助请随时联系。"
else:
return "请回复「是」确认或「否」取消。"
@staticmethod
def _looks_like_location(text: str) -> bool:
"""简单判断文本是否像位置信息"""
text = text.strip()
# 太短(<2字或是疑问句大概率不是位置
if len(text) < 2:
return False
if text.endswith("?") or text.endswith(""):
return False
# 包含位置关键词
location_keywords = [
"", "", "", "", "", "", "", "", "", "",
"走廊", "大堂", "停车场", "电梯", "楼梯", "通道", "出口", "入口",
"", "西", "", "", "", "", "", "",
"A座", "B座", "C座", "一楼", "二楼", "三楼",
"1楼", "2楼", "3楼", "1层", "2层", "3层",
]
for kw in location_keywords:
if kw in text:
return True
# 包含"取消""算了""不要了"等放弃词
cancel_words = ["取消", "算了", "不要了", "不用了", "没事", "不了"]
for w in cancel_words:
if w in text:
return False
# 长度适中2-30字且不含问号可能是位置
if 2 <= len(text) <= 30:
return True
return False
async def _handle_close_photo(self, user_id: str, session, image_url: str, object_key: str) -> str:
"""分析结单图片"""
previous_issue = session.pending_analysis or "之前的异常"
try:
resp = await self._client.chat.completions.create(
model=settings.agent.vlm_model,
messages=[
{"role": "system", "content": CLOSE_ANALYZE_PROMPT.format(previous_issue=previous_issue)},
{"role": "user", "content": [
{"type": "image_url", "image_url": {"url": image_url}},
{"type": "text", "text": "请判断异常是否已消除"},
]},
],
timeout=settings.agent.vlm_timeout,
)
text = resp.choices[0].message.content.strip()
if "```" in text:
text = text.split("```")[1].strip()
if text.startswith("json"):
text = text[4:].strip()
result = json.loads(text)
except Exception as e:
logger.error(f"结单图片分析失败: {e}")
session.reset()
return "图片分析失败,请重新拍照上传,或联系管理员手动结单。"
if result.get("resolved"):
# 自动结单
order_id = session.pending_order_id
if order_id:
try:
from app.services.work_order_client import get_work_order_client
wo_client = get_work_order_client()
await wo_client.auto_complete_order(int(order_id), "处理完成-AI确认异常已消除")
except Exception as e:
logger.error(f"自动结单失败: {e}")
session.reset()
desc = result.get("description", "现场已恢复正常")
return f"确认处理完成:{desc}\n工单已自动关闭。"
else:
desc = result.get("description", "仍有异常")
return f"检测到仍有异常:{desc}\n请继续处理后再次拍照上传。"
# ==================== VLM 调用 ====================
async def _chat(self, session) -> str:
"""带上下文的 VLM 对话"""
try:
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
messages.extend(session.get_history_for_vlm())
resp = await self._client.chat.completions.create(
model=settings.agent.vlm_model,
messages=messages,
timeout=settings.agent.vlm_timeout,
)
return resp.choices[0].message.content.strip()
except Exception as e:
logger.error(f"VLM对话失败: {e}")
return "抱歉AI助手暂时无法响应请稍后重试。"
async def _analyze_image(self, image_url: str) -> Dict:
"""VLM 分析图片内容"""
try:
resp = await self._client.chat.completions.create(
model=settings.agent.vlm_model,
messages=[
{"role": "system", "content": IMAGE_ANALYZE_PROMPT},
{"role": "user", "content": [
{"type": "image_url", "image_url": {"url": image_url}},
{"type": "text", "text": "请分析这张图片"},
]},
],
timeout=settings.agent.vlm_timeout,
)
text = resp.choices[0].message.content.strip()
if "```" in text:
text = text.split("```")[1].strip()
if text.startswith("json"):
text = text[4:].strip()
return json.loads(text)
except Exception as e:
logger.error(f"VLM图片分析失败: {e}")
return {"has_anomaly": False, "description": "", "alarm_type": ""}
# ==================== 意图执行 ====================
def _extract_intent(self, reply: str) -> Optional[Dict]:
"""从回复中提取嵌入的意图标记"""
if "<!--INTENT:" not in reply:
return None
try:
json_str = reply.split("<!--INTENT:")[1].split("-->")[0]
return json.loads(json_str)
except Exception:
return None
async def _execute_intent(self, user_id: str, intent_result: Dict) -> str:
"""执行识别到的意图"""
intent = intent_result.get("intent", "")
params = intent_result.get("params", {})
if intent == "query_alarm":
return await self._handle_query_alarm(user_id, params)
elif intent == "list_alarm":
return await self._handle_list_alarm(user_id, params)
elif intent == "export_report":
return await self._handle_export_report(user_id, params)
elif intent == "create_work_order":
session = get_session_manager().get(user_id)
description = params.get("description", "")
title = params.get("title", "人工上报")
location = params.get("location", "")
session.pending_analysis = description
session.pending_alarm_type = title
# 如果参数中已有位置,直接创建工单
if location and self._looks_like_location(location):
return await self._handle_location_reply(user_id, session, location)
# 否则追问位置
session.state = "waiting_location"
return "请提供具体位置A栋3层东侧走廊"
return ""
async def _handle_query_alarm(self, user_id: str, params: Dict) -> str:
"""查询告警统计"""
from app.services.alarm_event_service import get_alarm_event_service
svc = get_alarm_event_service()
time_range = params.get("time_range", "today")
now = beijing_now()
if time_range == "week":
start = now - timedelta(days=now.weekday())
start = start.replace(hour=0, minute=0, second=0, microsecond=0)
range_label = "本周"
elif time_range == "month":
start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
range_label = "本月"
else:
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
range_label = "今日"
alarm_type_filter = params.get("alarm_type")
if alarm_type_filter == "all":
alarm_type_filter = None
alarms, total = svc.get_alarms(
alarm_type=alarm_type_filter,
start_time=start,
end_time=now,
page=1,
page_size=10000,
)
type_count = {}
status_count = {"NEW": 0, "CONFIRMED": 0, "FALSE": 0, "CLOSED": 0}
for a in alarms:
type_count[a.alarm_type] = type_count.get(a.alarm_type, 0) + 1
if a.alarm_status in status_count:
status_count[a.alarm_status] += 1
type_names = {
"leave_post": "人员离岗", "intrusion": "周界入侵",
"illegal_parking": "车辆违停", "vehicle_congestion": "车辆拥堵",
}
type_lines = [f" {type_names.get(t, t)}: {c}" for t, c in type_count.items()]
return (
f"{range_label}告警统计\n"
f"总计: {total}\n"
+ "\n".join(type_lines) + "\n"
f"待处理: {status_count['NEW']}\n"
f"已处理: {status_count['CLOSED']}\n"
f"误报过滤: {status_count['FALSE']}"
)
async def _handle_list_alarm(self, user_id: str, params: Dict) -> str:
"""展示具体告警列表"""
from app.services.alarm_event_service import get_alarm_event_service
from app.services.camera_name_service import get_camera_name_service
svc = get_alarm_event_service()
camera_service = get_camera_name_service()
time_range = params.get("time_range", "today")
now = beijing_now()
if time_range == "week":
start = now - timedelta(days=now.weekday())
start = start.replace(hour=0, minute=0, second=0, microsecond=0)
range_label = "本周"
elif time_range == "month":
start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
range_label = "本月"
else:
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
range_label = "今日"
# 告警类型:中文/英文都兼容
alarm_type_filter = params.get("alarm_type")
if alarm_type_filter == "all":
alarm_type_filter = None
type_cn_to_code = {
"人员离岗": "leave_post", "离岗": "leave_post",
"周界入侵": "intrusion", "入侵": "intrusion",
"车辆违停": "illegal_parking", "违停": "illegal_parking",
"车辆拥堵": "vehicle_congestion", "拥堵": "vehicle_congestion",
}
if alarm_type_filter and alarm_type_filter in type_cn_to_code:
alarm_type_filter = type_cn_to_code[alarm_type_filter]
# 告警状态:中文/英文都兼容
alarm_status_filter = params.get("alarm_status")
status_cn_to_code = {
"待处理": "NEW", "未处理": "NEW", "": "NEW",
"处理中": "CONFIRMED", "已确认": "CONFIRMED",
"误报": "FALSE", "已忽略": "FALSE",
"已关闭": "CLOSED", "已处理": "CLOSED",
}
if alarm_status_filter and alarm_status_filter in status_cn_to_code:
alarm_status_filter = status_cn_to_code[alarm_status_filter]
alarms, total = svc.get_alarms(
alarm_type=alarm_type_filter,
alarm_status=alarm_status_filter,
start_time=start,
end_time=now,
page=1,
page_size=20,
)
if not alarms:
status_desc = {"NEW": "待处理", "CONFIRMED": "处理中", "FALSE": "误报", "CLOSED": "已关闭"}
filter_desc = status_desc.get(alarm_status_filter, "")
return f"{range_label}{filter_desc}告警0条"
type_names = {
"leave_post": "人员离岗", "intrusion": "周界入侵",
"illegal_parking": "车辆违停", "vehicle_congestion": "车辆拥堵",
}
status_names = {
"NEW": "待处理", "CONFIRMED": "处理中",
"FALSE": "误报", "CLOSED": "已关闭",
}
level_names = {0: "紧急", 1: "重要", 2: "普通", 3: "轻微"}
show_count = min(len(alarms), 10)
lines = [f"{range_label}告警(共{total}条,展示最近{show_count}条):"]
for i, a in enumerate(alarms[:show_count], 1):
t = type_names.get(a.alarm_type, a.alarm_type)
s = status_names.get(a.alarm_status, a.alarm_status)
l = level_names.get(a.alarm_level, "普通")
# 摄像头名称
cam_name = a.device_id
try:
cam_info = await camera_service.get_camera_info(a.device_id)
cam_name = camera_service.format_display_name(a.device_id, cam_info)
except Exception:
pass
# 时间
event_time = ""
if a.event_time:
try:
event_time = a.event_time.strftime("%m-%d %H:%M")
except Exception:
event_time = str(a.event_time)[:16]
lines.append(f"{i}. [{l}]{t} | {cam_name} | {s} | {event_time}")
if total > show_count:
lines.append(f"\n还有{total - show_count}条未展示,可说「导出报表」获取完整数据。")
return "\n".join(lines)
async def _handle_export_report(self, user_id: str, params: Dict) -> str:
"""导出Excel报表"""
from app.services.report_generator import generate_alarm_report
from app.services.oss_storage import get_oss_storage
time_range = params.get("time_range", "week")
result = generate_alarm_report(time_range=time_range)
if not result:
range_names = {"today": "今日", "week": "本周", "month": "本月"}
return f"{range_names.get(time_range, '今日')}暂无告警数据。"
filename, file_bytes = result
oss = get_oss_storage()
try:
object_key = oss.upload_file(
file_bytes.read(),
f"reports/{filename}",
content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
download_url = oss.get_presigned_url(object_key, expire=3600)
return f"报表已生成\n文件:{filename}\n下载:{download_url}"
except Exception as e:
logger.warning(f"报表上传COS失败: {e}")
return f"报表生成成功({filename}),但上传失败,请联系管理员。"
# 全局单例
_agent_dispatcher: Optional[AgentDispatcher] = None
def get_agent_dispatcher() -> AgentDispatcher:
global _agent_dispatcher
if _agent_dispatcher is None:
_agent_dispatcher = AgentDispatcher()
return _agent_dispatcher