Files
iot-device-management-service/app/services/agent_dispatcher.py
16337 0f27caf0b5 修复:摄像头名称显示全链路修复
- config.py: 简化 CameraNameConfig,删除 display_format/name_field_priority
- camera_name_service.py: 重写名称解析,固定优先级 cameraName→gbName→device_id,
  删除废弃的 app/stream 格式解析和 extract_name 方法
- yudao_aiot_alarm.py: 删除 stream→cameraId 的错误映射,cameraId 直接用 device_id
- agent_dispatcher.py: query_camera 删除技术字段返回,list_my_orders 添加摄像头名称解析
2026-03-24 13:38:45 +08:00

860 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
交互Agent调度器Function Calling 版)
使用 qwen3.5-plus 原生 Function Calling 处理用户意图,支持:
- 告警统计/列表/详情查询
- 告警状态更新(确认/忽略/完成/误报)
- 工单列表和结单提交(含图片)
- 摄像头信息查询
- 图片分析VLM
多轮对话上下文每用户独立10轮10分钟TTL
"""
import json
import time
from datetime import timedelta
from typing import Dict, List, Optional
from openai import AsyncOpenAI
from app.config import settings
from app.services.session_manager import get_session_manager
from app.utils.logger import logger
from app.utils.timezone import beijing_now
SYSTEM_PROMPT = """你是VSP安防AI助手通过企业微信协助安保人员处理告警和工单。
能力:
1. 查询告警统计和明细query_alarm_stats / list_alarms / get_alarm_detail
2. 处理告警update_alarm_status确认接单、忽略、完成、误报
3. 提交工单处理结果submit_order_result含文字描述和处理后照片
4. 查询待处理工单list_my_orders
5. 查询摄像头信息query_camera
交互规则:
- 用户发图片时,如果有待处理工单,询问是否作为处理结果上传
- 用户说"处理完了"并附带描述,自动提交结单
- 回复简洁,适合手机阅读
- 重要信息用【】标注
- 禁止使用markdown语法如![](url)、**加粗**、# 标题),企微聊天不支持
- 告警截图会自动发送图片消息,文字回复中不要包含图片链接"""
IMAGE_ANALYZE_PROMPT = """你是物业安防图片分析员。分析这张图片,判断是否存在安全隐患或需要上报的情况。
需要关注的异常包括:
- 岗位无人值守(前台、监控室、门岗等应有人但没人)
- 人员入侵(非授权区域出现人员)
- 车辆违停(禁停区域有车辆)
- 消防隐患(灭火器缺失、通道堵塞、线路杂乱)
- 设施损坏(门窗破损、设备故障)
- 物品遗留(可疑包裹、危险物品)
请用JSON格式回复
{"has_anomaly": true/false, "description": "异常描述", "alarm_type": "告警类型(leave_post/intrusion/illegal_parking/fire/damage/other/none)"}
只输出JSON不要其他内容。"""
# ==================== 7 个工具定义OpenAI tools 格式)====================
TOOLS = [
{
"type": "function",
"function": {
"name": "query_alarm_stats",
"description": "查询告警统计数据(总数、按类型分布、按状态分布)",
"parameters": {
"type": "object",
"properties": {
"time_range": {
"type": "string",
"enum": ["today", "week", "month"],
"description": "时间范围today=今日week=本周month=本月",
},
"alarm_type": {
"type": "string",
"enum": ["leave_post", "intrusion", "illegal_parking", "vehicle_congestion", "all"],
"description": "告警类型筛选all=全部",
},
},
"required": [],
},
},
},
{
"type": "function",
"function": {
"name": "list_alarms",
"description": "查询告警列表返回最近的告警记录含ID、类型、摄像头、状态、时间",
"parameters": {
"type": "object",
"properties": {
"time_range": {
"type": "string",
"enum": ["today", "week", "month"],
"description": "时间范围",
},
"alarm_type": {
"type": "string",
"enum": ["leave_post", "intrusion", "illegal_parking", "vehicle_congestion", "all"],
"description": "告警类型筛选",
},
"alarm_status": {
"type": "string",
"enum": ["NEW", "CONFIRMED", "FALSE", "CLOSED"],
"description": "告警状态筛选NEW=待处理CONFIRMED=处理中FALSE=误报CLOSED=已关闭",
},
"limit": {
"type": "integer",
"description": "返回条数默认10最多20",
},
},
"required": [],
},
},
},
{
"type": "function",
"function": {
"name": "get_alarm_detail",
"description": "查询单条告警的详细信息含扩展信息和AI分析结果",
"parameters": {
"type": "object",
"properties": {
"alarm_id": {
"type": "string",
"description": "告警ID如 edge_xxx 或 ALM_xxx",
},
},
"required": ["alarm_id"],
},
},
},
{
"type": "function",
"function": {
"name": "update_alarm_status",
"description": "更新告警状态:确认接单(confirm)、忽略误报(ignore)、处理完成(complete)、标记误报(false)",
"parameters": {
"type": "object",
"properties": {
"alarm_id": {
"type": "string",
"description": "告警ID",
},
"action": {
"type": "string",
"enum": ["confirm", "ignore", "complete", "false"],
"description": "操作confirm=确认接单ignore=忽略complete=处理完成false=标记误报",
},
},
"required": ["alarm_id", "action"],
},
},
},
{
"type": "function",
"function": {
"name": "list_my_orders",
"description": "查询我的待处理工单列表",
"parameters": {
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": ["HANDLING", "ALL"],
"description": "工单状态筛选,默认 HANDLING处理中",
},
},
"required": [],
},
},
},
{
"type": "function",
"function": {
"name": "submit_order_result",
"description": "提交工单处理结果(文字描述+处理后照片URL",
"parameters": {
"type": "object",
"properties": {
"alarm_id": {
"type": "string",
"description": "关联的告警ID",
},
"result_text": {
"type": "string",
"description": "处理结果描述",
},
"image_urls": {
"type": "array",
"items": {"type": "string"},
"description": "处理后照片URL列表COS永久URL",
},
},
"required": ["alarm_id", "result_text"],
},
},
},
{
"type": "function",
"function": {
"name": "query_camera",
"description": "查询摄像头信息(名称、位置、状态等)",
"parameters": {
"type": "object",
"properties": {
"camera_id": {
"type": "string",
"description": "摄像头IDdevice_id",
},
},
"required": ["camera_id"],
},
},
},
]
# 告警类型中文映射
ALARM_TYPE_NAMES = {
"leave_post": "人员离岗", "intrusion": "周界入侵",
"illegal_parking": "车辆违停", "vehicle_congestion": "车辆拥堵",
}
ALARM_LEVEL_NAMES = {0: "紧急", 1: "重要", 2: "普通", 3: "轻微"}
ALARM_STATUS_NAMES = {
"NEW": "待处理", "CONFIRMED": "处理中",
"FALSE": "误报", "CLOSED": "已关闭",
}
class AgentDispatcher:
"""交互Agent调度器Function Calling单例"""
def __init__(self):
self._client: Optional[AsyncOpenAI] = None
self._vlm_client: Optional[AsyncOpenAI] = None
self._enabled = False
# 临时存储:工具执行期间需要发送的图片 URL
self._pending_images: Dict[str, List[str]] = {} # user_id -> [image_urls]
def init(self, config):
"""初始化Agent"""
self._enabled = config.enabled and bool(config.vlm_api_key)
if self._enabled:
# FC 文本模型客户端
self._client = AsyncOpenAI(
api_key=config.vlm_api_key,
base_url=config.vlm_base_url,
)
# VLM 视觉模型客户端(共享 base_url模型不同
self._vlm_client = self._client
logger.info(f"交互Agent已启用: fc_model={config.model}, vlm_model={config.vlm_model}")
else:
logger.info("交互Agent未启用AGENT_ENABLED=false 或缺少 API Key")
@property
def enabled(self) -> bool:
return self._enabled
# ==================== 消息入口 ====================
async def handle_message(self, user_id: str, content: str) -> str:
"""处理文字消息"""
if not self._enabled:
return "AI助手未启用请联系管理员配置。"
session = get_session_manager().get(user_id)
session.add_history("user", content)
# 清空待发图片队列
self._pending_images[user_id] = []
try:
reply = await self._chat_with_tools(session, user_id)
except Exception as e:
logger.error(f"Agent FC 对话失败: {e}", exc_info=True)
reply = "抱歉AI助手暂时无法响应请稍后重试。"
# 发送工具执行期间收集的图片(如告警截图)
pending = self._pending_images.pop(user_id, [])
if pending:
await self._send_images_to_user(user_id, pending)
session.add_history("assistant", reply)
return reply
async def handle_image(self, user_id: str, media_id: str) -> str:
"""处理图片消息"""
if not self._enabled:
return "AI助手未启用请联系管理员配置。"
session = get_session_manager().get(user_id)
# 1. 下载图片
from app.services.wechat_service import get_wechat_service
wechat = get_wechat_service()
image_data = await wechat.download_media(media_id)
if not image_data:
return "图片下载失败,请重新发送。"
# 2. 上传 COS 持久化
from app.services.oss_storage import get_oss_storage
oss = get_oss_storage()
object_key = f"agent/{user_id}/{int(time.time())}.jpg"
try:
oss.upload_file(image_data, object_key, content_type="image/jpeg")
except Exception as e:
logger.error(f"Agent图片上传COS失败: {e}")
return "图片保存失败,请重新发送。"
permanent_url = oss.get_permanent_url(object_key)
presign_url = oss.get_presigned_url(object_key)
# 3. 检查用户是否有待处理工单
handling_alarm_id = self._find_handling_alarm(user_id)
if handling_alarm_id:
# 有待处理工单 → 暂存图片,提示用户确认
session.pending_images.append(permanent_url)
session.add_history("user", "[用户上传了一张图片]")
reply = f"收到图片,是否作为【告警 {handling_alarm_id[:20]}...】的处理结果提交?\n回复「是」确认提交,或继续发送更多图片。"
session.add_history("assistant", reply)
session.pending_alarm_id = handling_alarm_id
return reply
# 4. 无待处理工单 → VLM 分析图片内容
session.add_history("user", "[用户上传了一张图片]")
analysis = await self._analyze_image(presign_url)
if analysis.get("has_anomaly"):
desc = analysis.get("description", "异常情况")
reply = f"检测到异常:{desc}\n\n如需上报,请描述具体位置和情况。"
else:
reply = "未检测到明显安全隐患。如有疑问请描述情况。"
session.add_history("assistant", reply)
return reply
# ==================== Function Calling 核心循环 ====================
async def _chat_with_tools(self, session, user_id: str) -> str:
"""带工具调用的多轮对话"""
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
messages.extend(session.get_history_for_vlm())
max_rounds = 5
for _ in range(max_rounds):
resp = await self._client.chat.completions.create(
model=settings.agent.model,
messages=messages,
tools=TOOLS,
timeout=settings.agent.timeout,
)
choice = resp.choices[0]
if choice.finish_reason == "stop":
return (choice.message.content or "").strip()
if choice.message.tool_calls:
# 追加 assistant 消息(含 tool_calls
messages.append(choice.message)
for tc in choice.message.tool_calls:
try:
args = json.loads(tc.function.arguments) if tc.function.arguments else {}
except json.JSONDecodeError:
args = {}
result = await self._execute_tool(tc.function.name, args, user_id)
messages.append({
"role": "tool",
"tool_call_id": tc.id,
"content": json.dumps(result, ensure_ascii=False),
})
else:
# 无 tool_calls 且非 stop可能是 length 等),返回已有内容
return (choice.message.content or "处理超时,请重试").strip()
return "处理超时,请重试"
async def _execute_tool(self, name: str, args: dict, user_id: str) -> dict:
"""分发执行工具"""
try:
if name == "query_alarm_stats":
return await self._tool_query_alarm_stats(args)
elif name == "list_alarms":
return await self._tool_list_alarms(args)
elif name == "get_alarm_detail":
return await self._tool_get_alarm_detail(args, user_id)
elif name == "update_alarm_status":
return await self._tool_update_alarm_status(args, user_id)
elif name == "list_my_orders":
return await self._tool_list_my_orders(args, user_id)
elif name == "submit_order_result":
return await self._tool_submit_order_result(args, user_id)
elif name == "query_camera":
return await self._tool_query_camera(args)
else:
return {"error": f"未知工具: {name}"}
except Exception as e:
logger.error(f"工具执行失败: {name}, error={e}", exc_info=True)
return {"error": f"执行失败: {str(e)}"}
# ==================== 工具实现 ====================
async def _tool_query_alarm_stats(self, args: dict) -> dict:
"""告警统计"""
from app.services.alarm_event_service import get_alarm_event_service
svc = get_alarm_event_service()
time_range = args.get("time_range", "today")
start, range_label = self._parse_time_range(time_range)
now = beijing_now()
alarm_type_filter = args.get("alarm_type")
if alarm_type_filter == "all":
alarm_type_filter = None
alarms, total = svc.get_alarms(
alarm_type=alarm_type_filter,
start_time=start,
end_time=now,
page=1,
page_size=10000,
)
type_count = {}
status_count = {"NEW": 0, "CONFIRMED": 0, "FALSE": 0, "CLOSED": 0}
for a in alarms:
type_count[a.alarm_type] = type_count.get(a.alarm_type, 0) + 1
if a.alarm_status in status_count:
status_count[a.alarm_status] += 1
return {
"range": range_label,
"total": total,
"by_type": {ALARM_TYPE_NAMES.get(t, t): c for t, c in type_count.items()},
"by_status": {ALARM_STATUS_NAMES.get(s, s): c for s, c in status_count.items()},
}
async def _tool_list_alarms(self, args: dict) -> dict:
"""告警列表"""
from app.services.alarm_event_service import get_alarm_event_service
from app.services.camera_name_service import get_camera_name_service
svc = get_alarm_event_service()
camera_service = get_camera_name_service()
time_range = args.get("time_range", "today")
start, range_label = self._parse_time_range(time_range)
now = beijing_now()
alarm_type_filter = args.get("alarm_type")
if alarm_type_filter == "all":
alarm_type_filter = None
alarm_status_filter = args.get("alarm_status")
limit = min(args.get("limit", 10), 20)
alarms, total = svc.get_alarms(
alarm_type=alarm_type_filter,
alarm_status=alarm_status_filter,
start_time=start,
end_time=now,
page=1,
page_size=limit,
)
items = []
for a in alarms:
cam_name = a.device_id
try:
cam_info = await camera_service.get_camera_info(a.device_id)
cam_name = camera_service.format_display_name(a.device_id, cam_info)
except Exception:
pass
event_time = ""
if a.event_time:
try:
event_time = a.event_time.strftime("%m-%d %H:%M")
except Exception:
event_time = str(a.event_time)[:16]
items.append({
"alarm_id": a.alarm_id,
"type": ALARM_TYPE_NAMES.get(a.alarm_type, a.alarm_type),
"camera": cam_name,
"status": ALARM_STATUS_NAMES.get(a.alarm_status, a.alarm_status),
"level": ALARM_LEVEL_NAMES.get(a.alarm_level, "普通"),
"time": event_time,
})
return {"range": range_label, "total": total, "items": items}
async def _tool_get_alarm_detail(self, args: dict, user_id: str) -> dict:
"""告警详情"""
from app.services.alarm_event_service import get_alarm_event_service
svc = get_alarm_event_service()
alarm_id = args.get("alarm_id", "")
detail = svc.get_alarm(alarm_id)
if not detail:
return {"error": f"未找到告警: {alarm_id}"}
# 截图:加入待发图片队列,由 handle_message 统一发送
snapshot_url = detail.get("snapshot_url", "")
if snapshot_url:
if user_id not in self._pending_images:
self._pending_images[user_id] = []
self._pending_images[user_id].append(snapshot_url)
result = {
"alarm_id": detail.get("alarm_id"),
"alarm_type": ALARM_TYPE_NAMES.get(detail.get("alarm_type", ""), detail.get("alarm_type", "")),
"device_id": detail.get("device_id"),
"alarm_status": ALARM_STATUS_NAMES.get(detail.get("alarm_status", ""), detail.get("alarm_status", "")),
"alarm_level": ALARM_LEVEL_NAMES.get(detail.get("alarm_level"), "普通"),
"event_time": str(detail.get("event_time", ""))[:19],
"handle_status": detail.get("handle_status"),
"handler": detail.get("handler"),
"has_snapshot": bool(snapshot_url),
}
# 摄像头名称
try:
from app.services.camera_name_service import get_camera_name_service
camera_service = get_camera_name_service()
cam_info = await camera_service.get_camera_info(detail.get("device_id", ""))
result["camera_name"] = camera_service.format_display_name(detail.get("device_id", ""), cam_info)
except Exception:
result["camera_name"] = detail.get("device_id", "")
# LLM 分析
analyses = detail.get("llm_analyses", [])
if analyses:
latest = analyses[-1]
result["ai_analysis"] = latest.get("summary", "")
return result
async def _tool_update_alarm_status(self, args: dict, user_id: str) -> dict:
"""更新告警状态(复用卡片按钮的处理逻辑)"""
from app.services.alarm_event_service import get_alarm_event_service
from app.services.wechat_service import get_wechat_service
alarm_id = args.get("alarm_id", "")
action = args.get("action", "")
svc = get_alarm_event_service()
wechat = get_wechat_service()
# 查告警是否存在
detail = svc.get_alarm(alarm_id)
if not detail:
return {"error": f"未找到告警: {alarm_id}"}
# 获取关联工单ID
order_id = self._get_order_id_for_alarm(alarm_id)
# 执行操作
from app.services.work_order_client import get_work_order_client
wo_client = get_work_order_client()
if action == "confirm":
if order_id and wo_client.enabled:
success = await wo_client.confirm_order(order_id)
if not success:
svc.handle_alarm(alarm_id=alarm_id, alarm_status="CONFIRMED",
handle_status="HANDLING", handler=user_id, remark="企微Agent确认(IoT降级)")
else:
svc.handle_alarm(alarm_id=alarm_id, alarm_status="CONFIRMED",
handle_status="HANDLING", handler=user_id, remark="企微Agent确认接单")
# 更新卡片到 step2
response_code = wechat.get_response_code(alarm_id)
if response_code:
await wechat.update_alarm_card_step2(
response_code=response_code, user_ids=[user_id],
alarm_id=alarm_id, operator_name=user_id,
)
return {"success": True, "message": f"已确认接单: {alarm_id}"}
elif action == "ignore":
if order_id and wo_client.enabled:
success = await wo_client.false_alarm(order_id)
if not success:
svc.handle_alarm(alarm_id=alarm_id, alarm_status="FALSE",
handle_status="IGNORED", handler=user_id, remark="企微Agent忽略(IoT降级)")
else:
svc.handle_alarm(alarm_id=alarm_id, alarm_status="FALSE",
handle_status="IGNORED", handler=user_id, remark="企微Agent忽略")
response_code = wechat.get_response_code(alarm_id)
if response_code:
await wechat.update_alarm_card_terminal(
response_code=response_code, user_ids=[user_id],
alarm_id=alarm_id, action="ignore", operator_name=user_id,
)
return {"success": True, "message": f"已忽略: {alarm_id}"}
elif action == "complete":
if order_id and wo_client.enabled:
success = await wo_client.submit_order(order_id, result=f"已处理 by {user_id}")
if not success:
svc.handle_alarm(alarm_id=alarm_id, alarm_status="CLOSED",
handle_status="DONE", handler=user_id, remark="企微Agent完成(IoT降级)")
else:
svc.handle_alarm(alarm_id=alarm_id, alarm_status="CLOSED",
handle_status="DONE", handler=user_id, remark="企微Agent已处理")
response_code = wechat.get_response_code(alarm_id)
if response_code:
await wechat.update_alarm_card_terminal(
response_code=response_code, user_ids=[user_id],
alarm_id=alarm_id, action="complete", operator_name=user_id,
)
return {"success": True, "message": f"已处理完成: {alarm_id}"}
elif action == "false":
if order_id and wo_client.enabled:
success = await wo_client.false_alarm(order_id)
if not success:
svc.handle_alarm(alarm_id=alarm_id, alarm_status="FALSE",
handle_status="IGNORED", handler=user_id, remark="企微Agent误报(IoT降级)")
else:
svc.handle_alarm(alarm_id=alarm_id, alarm_status="FALSE",
handle_status="IGNORED", handler=user_id, remark="企微Agent标记误报")
response_code = wechat.get_response_code(alarm_id)
if response_code:
await wechat.update_alarm_card_terminal(
response_code=response_code, user_ids=[user_id],
alarm_id=alarm_id, action="false", operator_name=user_id,
)
return {"success": True, "message": f"已标记误报: {alarm_id}"}
return {"error": f"未知操作: {action}"}
async def _tool_list_my_orders(self, args: dict, user_id: str) -> dict:
"""查询我的待处理工单"""
from app.services.alarm_event_service import get_alarm_event_service
from app.services.camera_name_service import get_camera_name_service
svc = get_alarm_event_service()
camera_service = get_camera_name_service()
# 查询 handler=user_id & handle_status=HANDLING 的告警
alarms, total = svc.get_alarms(
alarm_status="CONFIRMED",
page=1,
page_size=20,
)
# 过滤属于该用户的
my_alarms = [a for a in alarms if a.handler == user_id]
if not my_alarms:
return {"total": 0, "items": [], "message": "当前没有待处理的工单"}
items = []
for a in my_alarms:
cam_name = a.device_id
try:
cam_info = await camera_service.get_camera_info(a.device_id)
cam_name = camera_service.format_display_name(a.device_id, cam_info)
except Exception:
pass
event_time = ""
if a.event_time:
try:
event_time = a.event_time.strftime("%m-%d %H:%M")
except Exception:
event_time = str(a.event_time)[:16]
items.append({
"alarm_id": a.alarm_id,
"type": ALARM_TYPE_NAMES.get(a.alarm_type, a.alarm_type),
"camera": cam_name,
"time": event_time,
})
return {"total": len(my_alarms), "items": items}
async def _tool_submit_order_result(self, args: dict, user_id: str) -> dict:
"""提交工单处理结果"""
from app.services.alarm_event_service import get_alarm_event_service
from app.services.wechat_service import get_wechat_service
alarm_id = args.get("alarm_id", "")
result_text = args.get("result_text", "已处理")
image_urls = args.get("image_urls", [])
svc = get_alarm_event_service()
wechat = get_wechat_service()
# 合并 session 中暂存的图片
session = get_session_manager().get(user_id)
if session.pending_images:
image_urls = session.pending_images + image_urls
session.pending_images = []
# 获取关联工单ID
order_id = self._get_order_id_for_alarm(alarm_id)
from app.services.work_order_client import get_work_order_client
wo_client = get_work_order_client()
if order_id and wo_client.enabled:
success = await wo_client.submit_order(
order_id,
result=f"{result_text} by {user_id}",
result_img_urls=image_urls or None,
)
if not success:
# IoT 降级
svc.handle_alarm(alarm_id=alarm_id, alarm_status="CLOSED",
handle_status="DONE", handler=user_id,
remark=f"企微Agent结单(IoT降级): {result_text}")
else:
svc.handle_alarm(alarm_id=alarm_id, alarm_status="CLOSED",
handle_status="DONE", handler=user_id,
remark=f"企微Agent结单: {result_text}")
# 更新卡片到终态
response_code = wechat.get_response_code(alarm_id)
if response_code:
await wechat.update_alarm_card_terminal(
response_code=response_code, user_ids=[user_id],
alarm_id=alarm_id, action="complete", operator_name=user_id,
)
return {
"success": True,
"message": f"工单已提交: {alarm_id}",
"result": result_text,
"images_count": len(image_urls),
}
async def _tool_query_camera(self, args: dict) -> dict:
"""查询摄像头信息"""
from app.services.camera_name_service import get_camera_name_service
camera_service = get_camera_name_service()
camera_id = args.get("camera_id", "")
try:
cam_info = await camera_service.get_camera_info(camera_id)
if cam_info:
return {
"camera_id": camera_id,
"name": camera_service.format_display_name(camera_id, cam_info),
}
return {"error": f"未找到摄像头: {camera_id}"}
except Exception as e:
return {"error": f"查询失败: {str(e)}"}
# ==================== VLM 图片分析(保留) ====================
async def _analyze_image(self, image_url: str) -> Dict:
"""VLM 分析图片内容"""
try:
resp = await self._vlm_client.chat.completions.create(
model=settings.agent.vlm_model,
messages=[
{"role": "system", "content": IMAGE_ANALYZE_PROMPT},
{"role": "user", "content": [
{"type": "image_url", "image_url": {"url": image_url}},
{"type": "text", "text": "请分析这张图片"},
]},
],
timeout=settings.agent.vlm_timeout,
)
text = resp.choices[0].message.content.strip()
if "```" in text:
text = text.split("```")[1].strip()
if text.startswith("json"):
text = text[4:].strip()
return json.loads(text)
except Exception as e:
logger.error(f"VLM图片分析失败: {e}")
return {"has_anomaly": False, "description": "", "alarm_type": ""}
# ==================== 辅助方法 ====================
async def _send_images_to_user(self, user_id: str, image_urls: List[str]):
"""通过企微发送图片消息给用户"""
from app.services.wechat_service import get_wechat_service
wechat = get_wechat_service()
if not wechat.enabled:
return
for url in image_urls:
try:
media_id = await wechat.upload_media_from_url(url)
if media_id:
# 发送图片消息(复用企微发送图片的能力)
access_token = await wechat._get_access_token()
import httpx
msg = {
"touser": user_id,
"msgtype": "image",
"agentid": wechat.agent_id_int,
"image": {"media_id": media_id},
}
api_url = f"https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={access_token}"
async with httpx.AsyncClient(timeout=10) as client:
await client.post(api_url, json=msg)
except Exception as e:
logger.error(f"发送告警截图失败: user={user_id}, error={e}")
@staticmethod
def _parse_time_range(time_range: str):
"""解析时间范围,返回 (start_time, label)"""
now = beijing_now()
if time_range == "week":
start = now - timedelta(days=now.weekday())
start = start.replace(hour=0, minute=0, second=0, microsecond=0)
return start, "本周"
elif time_range == "month":
start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
return start, "本月"
else:
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
return start, "今日"
@staticmethod
def _get_order_id_for_alarm(alarm_id: str) -> str:
"""从 alarm_event_ext 中获取关联的工单ID"""
from app.models import get_session, AlarmEventExt
db = get_session()
try:
ext = db.query(AlarmEventExt).filter(
AlarmEventExt.alarm_id == alarm_id,
AlarmEventExt.ext_type == "WORK_ORDER",
).first()
if ext and ext.ext_data:
return ext.ext_data.get("order_id", "")
return ""
except Exception as e:
logger.error(f"查询工单ID失败: alarm={alarm_id}, error={e}")
return ""
finally:
db.close()
@staticmethod
def _find_handling_alarm(user_id: str) -> str:
"""查找用户正在处理的告警IDhandle_status=HANDLING & handler=user_id"""
from app.models import get_session, AlarmEvent
db = get_session()
try:
alarm = db.query(AlarmEvent).filter(
AlarmEvent.handler == user_id,
AlarmEvent.handle_status == "HANDLING",
).order_by(AlarmEvent.event_time.desc()).first()
return alarm.alarm_id if alarm else ""
except Exception as e:
logger.error(f"查询待处理告警失败: user={user_id}, error={e}")
return ""
finally:
db.close()
# 全局单例
_agent_dispatcher: Optional[AgentDispatcher] = None
def get_agent_dispatcher() -> AgentDispatcher:
global _agent_dispatcher
if _agent_dispatcher is None:
_agent_dispatcher = AgentDispatcher()
return _agent_dispatcher