""" 交互Agent调度器(多模态版) 统一使用 VLM 模型处理文字+图片,支持: - 文字意图识别(创建工单/查询告警/导出报表/闲聊) - 图片分析上报(VLM 分析 → 追问位置 → 创建工单) - 结单图片分析(VLM 确认异常消除 → 自动结单) - 多轮对话上下文(每用户独立,10轮,10分钟TTL) """ import json import time from datetime import timedelta from typing import Dict, List, Optional from openai import AsyncOpenAI from app.config import settings from app.services.session_manager import get_session_manager from app.utils.logger import logger from app.utils.timezone import beijing_now SYSTEM_PROMPT = """你是物业安防AI助手。你可以: 1. 分析用户上传的现场图片,识别安全隐患或异常情况 2. 帮助创建安保工单(需要位置信息) 3. 查询告警统计数据 4. 导出告警报表 5. 分析处理结果图片,确认异常是否消除 规则: - 当用户发送图片时,分析图片内容,判断是否有安全隐患 - 如果识别到异常,描述异常并询问具体位置 - 回复简洁专业,不超过100字 - 不要编造不存在的信息 当需要识别意图时,在回复末尾附加JSON标记: 可选意图: - create_work_order: 用户要创建工单或上报问题 - query_alarm: 用户要查询告警数据(params: time_range=today/week/month, alarm_type=leave_post/intrusion/all) - export_report: 用户要导出报表(params: time_range=today/week/month) - general_chat: 其他对话(无需附加标记)""" IMAGE_ANALYZE_PROMPT = """分析这张图片,判断是否存在安全隐患或异常情况。 请用JSON格式回复: {"has_anomaly": true/false, "description": "异常描述", "alarm_type": "告警类型(fire/intrusion/damage/leak/other/none)"} 只输出JSON,不要其他内容。""" CLOSE_ANALYZE_PROMPT = """这是一张处理后的现场照片。请判断之前的异常是否已经消除。 之前的异常是:{previous_issue} 请用JSON格式回复: {"resolved": true/false, "description": "当前状态描述"} 只输出JSON,不要其他内容。""" class AgentDispatcher: """交互Agent调度器(多模态,单例)""" def __init__(self): self._client: Optional[AsyncOpenAI] = None self._enabled = False def init(self, config): """初始化Agent""" self._enabled = config.enabled and bool(config.vlm_api_key) if self._enabled: self._client = AsyncOpenAI( api_key=config.vlm_api_key, base_url=config.vlm_base_url, ) logger.info(f"交互Agent已启用: model={config.vlm_model}") else: logger.info("交互Agent未启用(AGENT_ENABLED=false 或缺少 API Key)") @property def enabled(self) -> bool: return self._enabled # ==================== 消息入口 ==================== async def handle_message(self, user_id: str, content: str) -> str: """处理文字消息""" if not self._enabled: return "AI助手未启用,请联系管理员配置。" session = get_session_manager().get(user_id) # 状态机:等待位置信息 if session.state == "waiting_location": return await self._handle_location_reply(user_id, session, content) # 状态机:等待确认 if session.state == "waiting_confirm": return await self._handle_confirm_reply(user_id, session, content) # 正常对话:带上下文调用 VLM session.add_history("user", content) reply = await self._chat(session) session.add_history("assistant", reply) # 检查是否有嵌入的意图标记 intent_result = self._extract_intent(reply) if intent_result: clean_reply = reply.split("")[0] return json.loads(json_str) except Exception: return None async def _execute_intent(self, user_id: str, intent_result: Dict) -> str: """执行识别到的意图""" intent = intent_result.get("intent", "") params = intent_result.get("params", {}) if intent == "query_alarm": return await self._handle_query_alarm(user_id, params) elif intent == "export_report": return await self._handle_export_report(user_id, params) elif intent == "create_work_order": # 文字创建工单 → 进入等待位置状态 session = get_session_manager().get(user_id) session.pending_analysis = params.get("description", "") session.pending_alarm_type = params.get("title", "人工上报") session.state = "waiting_location" return "请提供具体位置(如:A栋3层东侧走廊)" return "" async def _handle_query_alarm(self, user_id: str, params: Dict) -> str: """查询告警统计""" from app.services.alarm_event_service import get_alarm_event_service svc = get_alarm_event_service() time_range = params.get("time_range", "today") now = beijing_now() if time_range == "week": start = now - timedelta(days=now.weekday()) start = start.replace(hour=0, minute=0, second=0, microsecond=0) range_label = "本周" elif time_range == "month": start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0) range_label = "本月" else: start = now.replace(hour=0, minute=0, second=0, microsecond=0) range_label = "今日" alarm_type_filter = params.get("alarm_type") if alarm_type_filter == "all": alarm_type_filter = None alarms, total = svc.get_alarms( alarm_type=alarm_type_filter, start_time=start, end_time=now, page=1, page_size=10000, ) type_count = {} status_count = {"NEW": 0, "CONFIRMED": 0, "FALSE": 0, "CLOSED": 0} for a in alarms: type_count[a.alarm_type] = type_count.get(a.alarm_type, 0) + 1 if a.alarm_status in status_count: status_count[a.alarm_status] += 1 type_names = { "leave_post": "人员离岗", "intrusion": "周界入侵", "illegal_parking": "车辆违停", "vehicle_congestion": "车辆拥堵", } type_lines = [f" {type_names.get(t, t)}: {c}条" for t, c in type_count.items()] return ( f"{range_label}告警统计\n" f"总计: {total}条\n" + "\n".join(type_lines) + "\n" f"待处理: {status_count['NEW']}条\n" f"已处理: {status_count['CLOSED']}条\n" f"误报过滤: {status_count['FALSE']}条" ) async def _handle_export_report(self, user_id: str, params: Dict) -> str: """导出Excel报表""" from app.services.report_generator import generate_alarm_report from app.services.oss_storage import get_oss_storage time_range = params.get("time_range", "week") result = generate_alarm_report(time_range=time_range) if not result: range_names = {"today": "今日", "week": "本周", "month": "本月"} return f"{range_names.get(time_range, '今日')}暂无告警数据。" filename, file_bytes = result oss = get_oss_storage() try: object_key = oss.upload_file( file_bytes.read(), f"reports/{filename}", content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ) download_url = oss.get_presigned_url(object_key, expire=3600) return f"报表已生成\n文件:{filename}\n下载:{download_url}" except Exception as e: logger.warning(f"报表上传COS失败: {e}") return f"报表生成成功({filename}),但上传失败,请联系管理员。" # 全局单例 _agent_dispatcher: Optional[AgentDispatcher] = None def get_agent_dispatcher() -> AgentDispatcher: global _agent_dispatcher if _agent_dispatcher is None: _agent_dispatcher = AgentDispatcher() return _agent_dispatcher