Files
iot-device-management-service/app/services/agent_dispatcher.py
16337 ce5272413b fix: 修复通知日志不可见问题
- notify_dispatch/agent_dispatcher 改用主logger(alert_platform)
- edge_compat 异常不再静默吞掉,输出错误日志

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 11:22:06 +08:00

236 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
交互Agent调度器
接收企微用户消息通过LLM识别意图路由到对应处理器。
支持意图:
- create_work_order: 创建工单("帮我创建XX工单"
- query_alarm: 查询告警("今天有多少告警"
- export_report: 导出报表("导出本周告警报表"
- general_chat: 兜底闲聊
"""
import json
from datetime import datetime, timedelta, timezone
from typing import Dict, Optional
from openai import AsyncOpenAI
from app.config import settings
from app.utils.logger import logger
INTENT_SYSTEM_PROMPT = """你是物业安防AI助手。根据用户消息识别意图仅输出JSON。
可选意图:
- create_work_order: 用户要创建工单或上报问题
- query_alarm: 用户要查询告警数据或统计
- export_report: 用户要导出报表或Excel
- general_chat: 其他闲聊或无法识别
输出格式:{"intent":"...","params":{...}}
params说明
- create_work_order: {"title":"工单标题","description":"描述","priority":"low/medium/high/urgent"}
- query_alarm: {"time_range":"today/week/month","alarm_type":"leave_post/intrusion/all"}
- export_report: {"time_range":"today/week/month"}
- general_chat: {"message":"友好的回复内容"}"""
class AgentDispatcher:
"""交互Agent调度器单例"""
def __init__(self):
self._client: Optional[AsyncOpenAI] = None
self._enabled = False
def init(self, config):
"""初始化Agent"""
self._enabled = config.enabled and bool(config.llm_api_key)
if self._enabled:
self._client = AsyncOpenAI(
api_key=config.llm_api_key,
base_url=config.llm_base_url,
)
logger.info(f"交互Agent已启用: model={config.llm_model}")
else:
logger.info("交互Agent未启用AGENT_ENABLED=false 或缺少 API Key")
@property
def enabled(self) -> bool:
return self._enabled
async def handle_message(self, user_id: str, content: str) -> str:
"""处理用户消息,返回回复文本"""
if not self._enabled:
return "AI助手未启用请联系管理员配置。"
# 1. 意图识别
intent_result = await self._classify_intent(content)
intent = intent_result.get("intent", "general_chat")
params = intent_result.get("params", {})
logger.info(f"Agent意图识别: user={user_id}, intent={intent}, params={params}")
# 2. 路由到对应 handler
handlers = {
"create_work_order": self._handle_create_work_order,
"query_alarm": self._handle_query_alarm,
"export_report": self._handle_export_report,
"general_chat": self._handle_general_chat,
}
handler = handlers.get(intent, self._handle_general_chat)
try:
return await handler(user_id, params, content)
except Exception as e:
logger.error(f"Agent handler异常: intent={intent}, error={e}", exc_info=True)
return "处理请求时出错,请稍后重试。"
async def _classify_intent(self, content: str) -> Dict:
"""LLM意图分类"""
try:
resp = await self._client.chat.completions.create(
model=settings.agent.llm_model,
messages=[
{"role": "system", "content": INTENT_SYSTEM_PROMPT},
{"role": "user", "content": content},
],
timeout=settings.agent.llm_timeout,
)
text = resp.choices[0].message.content.strip()
if "```" in text:
text = text.split("```")[1]
if text.startswith("json"):
text = text[4:]
text = text.strip()
return json.loads(text)
except Exception as e:
logger.error(f"意图识别失败: {e}")
return {"intent": "general_chat", "params": {"message": "抱歉,我暂时无法理解您的请求。"}}
async def _handle_create_work_order(self, user_id: str, params: Dict, raw: str) -> str:
"""创建工单"""
from app.services.work_order_service import get_work_order_service
svc = get_work_order_service()
title = params.get("title", "")
if not title:
title = raw[:50]
order = svc.create_work_order(
title=title,
description=params.get("description", raw),
priority=params.get("priority", "medium"),
assignee_uid=user_id,
)
if order:
priority_names = {"low": "", "medium": "", "high": "", "urgent": "紧急"}
p_name = priority_names.get(order.priority.value, "")
return (
f"工单已创建\n"
f"编号:{order.order_no}\n"
f"标题:{order.title}\n"
f"优先级:{p_name}\n"
f"状态:待处理"
)
return "工单创建失败,请稍后重试"
async def _handle_query_alarm(self, user_id: str, params: Dict, raw: str) -> str:
"""查询告警统计"""
from app.services.alarm_event_service import get_alarm_event_service
svc = get_alarm_event_service()
# 解析时间范围
time_range = params.get("time_range", "today")
now = datetime.now(timezone.utc)
if time_range == "week":
start = now - timedelta(days=now.weekday())
start = start.replace(hour=0, minute=0, second=0, microsecond=0)
range_label = "本周"
elif time_range == "month":
start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
range_label = "本月"
else:
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
range_label = "今日"
alarm_type_filter = params.get("alarm_type")
if alarm_type_filter == "all":
alarm_type_filter = None
alarms, total = svc.get_alarms(
alarm_type=alarm_type_filter,
start_time=start,
end_time=now,
page=1,
page_size=10000,
)
# 按类型统计
type_count = {}
status_count = {"NEW": 0, "CONFIRMED": 0, "FALSE": 0, "CLOSED": 0}
for a in alarms:
type_count[a.alarm_type] = type_count.get(a.alarm_type, 0) + 1
if a.alarm_status in status_count:
status_count[a.alarm_status] += 1
type_names = {"leave_post": "人员离岗", "intrusion": "周界入侵"}
type_lines = [f" {type_names.get(t, t)}: {c}" for t, c in type_count.items()]
return (
f"{range_label}告警统计\n"
f"总计: {total}\n"
+ "\n".join(type_lines) + "\n"
f"待处理: {status_count['NEW']}\n"
f"已处理: {status_count['CLOSED']}\n"
f"误报过滤: {status_count['FALSE']}"
)
async def _handle_export_report(self, user_id: str, params: Dict, raw: str) -> str:
"""导出Excel报表"""
from app.services.report_generator import generate_alarm_report
from app.services.oss_storage import get_oss_storage
time_range = params.get("time_range", "week")
result = generate_alarm_report(time_range=time_range)
if not result:
range_names = {"today": "今日", "week": "本周", "month": "本月"}
return f"{range_names.get(time_range, '今日')}暂无告警数据,无法生成报表。"
filename, file_bytes = result
# 上传到 COS 获取下载链接
oss = get_oss_storage()
try:
object_key = oss.upload_file(
file_bytes.read(),
f"reports/{filename}",
content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
download_url = oss.get_presigned_url(object_key, expire=3600)
return f"报表已生成\n文件:{filename}\n下载:{download_url}"
except Exception as e:
logger.warning(f"报表上传COS失败: {e}")
return f"报表已生成({filename}),但上传失败,请联系管理员。"
async def _handle_general_chat(self, user_id: str, params: Dict, raw: str) -> str:
"""兜底回复"""
msg = params.get("message", "")
if msg:
return msg
return "您好我是安防AI助手。可以帮您\n1. 创建工单\n2. 查询告警统计\n3. 导出告警报表\n\n请直接描述您的需求。"
# 全局单例
_agent_dispatcher: Optional[AgentDispatcher] = None
def get_agent_dispatcher() -> AgentDispatcher:
global _agent_dispatcher
if _agent_dispatcher is None:
_agent_dispatcher = AgentDispatcher()
return _agent_dispatcher