Files
iot-device-management-service/app/services/notify_dispatch.py
16337 63a8d5a8f2 告警-工单解耦:企微交互+Agent全面切换到工单驱动
Part A: 数据层
- 新增 WechatCardState 模型(order_id ↔ alarm_id 映射 + response_code)
- 新建 models_iot.py(IoT 工单只读 ORM:ops_order + security_ext + clean_ext)
- config.py 新增 IOT_DATABASE_URL 配置

Part B: 企微解耦(alarm_id → order_id)
- wechat_service: response_code 存储迁移到 wechat_card_state,集中 helper
- 卡片发送/更新方法改用 order_id,按钮 key: confirm_{order_id}
- wechat_callback: 按钮解析改 order_id,反查 alarm_id(可空)
- wechat_notify_api: send-card/sync-status 以 orderId 为主键
- yudao_aiot_alarm: 卡片操作改用 order_id,删重复 helper

Part C: Agent 工具全面改为工单驱动
- 新建 order_query.py(查 IoT ops_order,支持安保+保洁工单)
- 新建 order_action.py(操作工单状态 + 提交处理结果)
- 更新 prompts.py 为工单助手
- 更新工具注册(__init__.py)

Part D: 日报改为工单驱动
- daily_report_service 从查 alarm_event 改为查 IoT ops_order + 扩展表
- 支持安保+保洁工单统计
2026-03-31 10:49:42 +08:00

449 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
通知调度服务
告警创建后的异步处理流水线:
1. VLM 复核截图 → 写入 alarm_llm_analysis
2. 查表获取通知人员 (camera → area → person)
3. 创建 IoT 工单 → 告警状态改为 CONFIRMED
企微通知由 IoT 工单回调驱动,不在此处发送。
全程异步执行,不阻塞告警接收主流程。
VLM 不可用时自动降级,不影响系统运行。
"""
import os
from typing import Dict
from app.models import (
get_session,
AlarmEvent, AlarmLlmAnalysis, AlarmEventExt,
CameraAreaBinding, AreaPersonBinding, NotifyArea,
)
from app.config import settings
from app.services.vlm_service import get_vlm_service
from app.services.wechat_service import ALARM_TYPE_NAMES
from app.services.camera_name_service import get_camera_name_service
from app.services.work_order_client import get_work_order_client
from app.utils.logger import logger
from app.utils.timezone import beijing_now
async def process_alarm_notification(alarm_data: Dict):
"""
告警通知处理流水线异步fire-and-forget
Args:
alarm_data: 告警字典,包含 alarm_id, alarm_type, device_id,
snapshot_url, alarm_level, event_time 等字段
"""
alarm_id = alarm_data.get("alarm_id", "")
alarm_type = alarm_data.get("alarm_type", "")
device_id = alarm_data.get("device_id", "")
snapshot_url = alarm_data.get("snapshot_url", "")
alarm_level = alarm_data.get("alarm_level", 2)
event_time = alarm_data.get("event_time", "")
area_id = alarm_data.get("area_id")
skip_vlm = alarm_data.get("skip_vlm", False)
logger.info(f"开始处理告警通知: {alarm_id}" + (" (跳过VLM)" if skip_vlm else ""))
try:
scene_id = alarm_data.get("scene_id", "")
# 查询摄像头名称(从 WVP 获取,优先 cameraName
camera_name_service = get_camera_name_service()
camera_info = await camera_name_service.get_camera_info(device_id)
camera_name = camera_name_service.format_display_name(device_id, camera_info)
# 查找区域名称:优先从 IoT 平台查 area_id降级到通知三表
area_name_for_vlm = await _get_area_name_from_iot(area_id) if area_id else ""
if not area_name_for_vlm:
area_name_for_vlm, _, _ = _get_notify_persons(device_id, alarm_level)
roi_name = area_name_for_vlm if area_name_for_vlm != "未知区域" else scene_id
# snapshot_url 可能是 COS object key需转为可访问的预签名URL
presigned_snapshot_url = _get_presigned_url(snapshot_url)
description = ""
if not skip_vlm:
# ========== 1. VLM 复核 ==========
vlm_service = get_vlm_service()
vlm_result = await vlm_service.verify_alarm(
snapshot_url=presigned_snapshot_url,
alarm_type=alarm_type,
camera_name=camera_name,
roi_name=roi_name,
)
# 写入 alarm_llm_analysis 表
_save_vlm_result(alarm_id, vlm_result)
# VLM 明确判定为误报(非降级跳过)→ 更新告警状态,不通知
if not vlm_result.get("confirmed", True) and not vlm_result.get("skipped"):
_mark_false_alarm(alarm_id)
logger.info(f"VLM 判定误报,跳过通知: {alarm_id}")
return
# VLM 跳过(降级)且 confirmed=False → 不改告警状态,仅跳过通知
if not vlm_result.get("confirmed", True) and vlm_result.get("skipped"):
logger.info(f"VLM 降级跳过,不推送但保留告警状态: {alarm_id}")
return
description = vlm_result.get("description", "")
else:
logger.info(f"跳过 VLM 复核: {alarm_id}")
# ========== 2. 查表获取通知人员 ==========
area_name = area_name_for_vlm
_, area_id_int, persons = _get_notify_persons(device_id, alarm_level)
# 演示模式:数据库无人员时,使用配置的测试 userid
if not persons and settings.wechat.test_uids:
test_uids = [uid.strip() for uid in settings.wechat.test_uids.split(",") if uid.strip()]
if test_uids:
persons = [{"person_name": "测试用户", "wechat_uid": uid, "role": "TEST"} for uid in test_uids]
if not area_name or area_name == "未知区域":
area_name = "演示区域"
logger.info(f"演示模式: 使用测试用户 {test_uids}")
if not persons:
logger.warning(f"未找到通知人员: camera={device_id}, 跳过工单创建")
# ========== 3. 创建工单 ==========
wo_client = get_work_order_client()
if wo_client.enabled:
wo_area_id = _get_alarm_area_id(alarm_id) or area_id_int
if wo_area_id:
type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
wo_title = f"{type_name}告警"
trigger_source = _get_trigger_source(alarm_id)
permanent_url = _get_permanent_url(snapshot_url)
order_id = await wo_client.create_order(
title=wo_title,
area_id=wo_area_id,
alarm_id=alarm_id,
alarm_type=type_name,
description=description,
priority=alarm_level,
trigger_source=trigger_source,
camera_id=device_id,
image_url=permanent_url,
roi_id=scene_id or "",
source_type="ALARM",
)
if order_id:
_save_order_id(alarm_id, order_id)
_set_alarm_confirmed(alarm_id)
logger.info(f"工单已创建,告警已确认: alarm={alarm_id}, order={order_id}")
else:
logger.warning(f"告警无 area_id跳过工单创建: {alarm_id}")
except Exception as e:
logger.error(f"告警通知处理失败: {alarm_id}, error={e}", exc_info=True)
def _get_presigned_url(snapshot_url: str) -> str:
"""将 COS object key 或永久 URL 转为预签名 URL"""
if not snapshot_url:
return ""
# 如果是完整 COS 永久 URL提取 object key 后重新生成预签名 URL
if snapshot_url.startswith("http"):
object_key = _extract_cos_object_key(snapshot_url)
if object_key:
try:
from app.services.oss_storage import get_oss_storage
return get_oss_storage().get_presigned_url(object_key)
except Exception as e:
logger.warning(f"从COS URL生成预签名失败: {e}")
# 无法提取 key返回原 URL可能是外部 URL
return snapshot_url
try:
from app.services.oss_storage import get_oss_storage
return get_oss_storage().get_presigned_url(snapshot_url)
except Exception as e:
logger.warning(f"生成预签名URL失败: {e}")
return snapshot_url
def _extract_cos_object_key(url: str) -> str:
"""从 COS 永久 URL 中提取 object key
支持格式:
- https://{bucket}.cos.{region}.myqcloud.com/{key}
- https://cos.{region}.myqcloud.com/{bucket}/{key}
"""
try:
from urllib.parse import urlparse
parsed = urlparse(url)
host = parsed.hostname or ""
path = parsed.path.lstrip("/")
if not path or "myqcloud.com" not in host:
return ""
# 格式1: {bucket}.cos.{region}.myqcloud.com/{key}
if ".cos." in host and host.endswith(".myqcloud.com"):
return path
# 格式2: cos.{region}.myqcloud.com/{bucket}/{key}
if host.startswith("cos.") and host.endswith(".myqcloud.com"):
parts = path.split("/", 1)
if len(parts) == 2:
return parts[1]
return ""
except Exception:
return ""
def _save_vlm_result(alarm_id: str, vlm_result: Dict):
"""将 VLM 复核结果写入 alarm_llm_analysis 表"""
db = get_session()
try:
analysis = AlarmLlmAnalysis(
alarm_id=alarm_id,
llm_model=settings.vlm.model,
analysis_type="REVIEW",
summary=vlm_result.get("description", ""),
is_false_alarm=not vlm_result.get("confirmed", True),
confidence_score=None if vlm_result.get("skipped") else 0.9,
suggestion="VLM跳过" if vlm_result.get("skipped") else None,
)
db.add(analysis)
db.commit()
except Exception as e:
db.rollback()
logger.error(f"保存 VLM 结果失败: {e}")
finally:
db.close()
def _mark_false_alarm(alarm_id: str):
"""将告警标记为误报"""
db = get_session()
try:
alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first()
if alarm:
now = beijing_now()
alarm.alarm_status = "FALSE"
alarm.handle_status = "IGNORED"
alarm.handle_remark = "VLM复核判定误报"
alarm.handled_at = now
if alarm.duration_ms is None and alarm.event_time:
try:
delta = now - alarm.event_time
alarm.duration_ms = int(delta.total_seconds() * 1000)
except Exception:
pass
if alarm.last_frame_time is None:
alarm.last_frame_time = now
db.commit()
logger.info(f"告警已标记误报: {alarm_id}")
except Exception as e:
db.rollback()
logger.error(f"标记误报失败: {e}")
finally:
db.close()
def _get_notify_persons(camera_id: str, alarm_level: int) -> tuple:
"""
根据摄像头ID查找通知人员
Returns:
(area_name, area_id, [{"person_name": ..., "wechat_uid": ..., "role": ...}])
"""
db = get_session()
try:
binding = db.query(CameraAreaBinding).filter(
CameraAreaBinding.camera_id == camera_id
).first()
if not binding:
return ("未知区域", 0, [])
area = db.query(NotifyArea).filter(
NotifyArea.area_id == binding.area_id,
NotifyArea.enabled == 1,
).first()
area_name = area.area_name if area else "未知区域"
area_id = binding.area_id or 0
persons = db.query(AreaPersonBinding).filter(
AreaPersonBinding.area_id == binding.area_id,
AreaPersonBinding.enabled == 1,
AreaPersonBinding.notify_level >= alarm_level,
).all()
result = [
{
"person_name": p.person_name,
"wechat_uid": p.wechat_uid,
"role": p.role,
}
for p in persons
]
return (area_name, area_id, result)
except Exception as e:
logger.error(f"查询通知人员失败: {e}")
return ("未知区域", 0, [])
finally:
db.close()
# 区域名称缓存area_id → (area_name, expire_time)5 分钟 TTL
_area_name_cache: Dict[int, tuple] = {}
_iot_token_cache: Dict[str, str] = {"token": "", "expire": 0}
_AREA_CACHE_TTL = 300 # 5 分钟
async def _get_area_name_from_iot(area_id: int) -> str:
"""从 IoT 平台查询区域名称(带 TTL 缓存)"""
if not area_id:
return ""
cached = _area_name_cache.get(area_id)
if cached and cached[1] > __import__('time').time():
return cached[0]
try:
import httpx
import time
# IoT 平台地址区域查询优先级IOT_PLATFORM_URL > WORK_ORDER_BASE_URL
base_url = (
os.getenv("IOT_PLATFORM_URL", "")
or settings.work_order.base_url
)
if not base_url:
return ""
# Token 缓存(有效期内复用)
now = time.time()
if not _iot_token_cache["token"] or now > _iot_token_cache["expire"]:
async with httpx.AsyncClient(timeout=5) as client:
login_resp = await client.post(
f"{base_url}/admin-api/system/auth/login",
json={"username": "admin", "password": "admin123", "tenantName": "默认"},
headers={"tenant-id": "1"},
)
login_data = login_resp.json().get("data", {})
_iot_token_cache["token"] = login_data.get("accessToken", "")
# token 有效期约 30 分钟,这里缓存 20 分钟
_iot_token_cache["expire"] = now + 1200
token = _iot_token_cache["token"]
if not token:
return ""
async with httpx.AsyncClient(timeout=5) as client:
resp = await client.get(
f"{base_url}/admin-api/ops/area/get",
params={"id": area_id},
headers={"tenant-id": "1", "Authorization": f"Bearer {token}"},
)
data = resp.json()
if data.get("code") == 0 and data.get("data"):
name = data["data"].get("areaName", "")
if name:
_area_name_cache[area_id] = (name, __import__('time').time() + _AREA_CACHE_TTL)
return name
except Exception as e:
logger.warning(f"查询IoT平台区域名失败: area_id={area_id}, error={e}")
return ""
def _get_alarm_area_id(alarm_id: str) -> int:
"""从 alarm_event 表获取 area_id"""
db = get_session()
try:
alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first()
return alarm.area_id if alarm and alarm.area_id else 0
except Exception:
return 0
finally:
db.close()
def _get_trigger_source(alarm_id: str) -> str:
"""从 alarm_event_ext 的 ext_type 判断告警来源"""
db = get_session()
try:
ext = db.query(AlarmEventExt).filter(
AlarmEventExt.alarm_id == alarm_id,
AlarmEventExt.ext_type.in_(["EDGE_HTTP", "EDGE", "POST", "MANUAL"]),
).first()
if ext:
return {
"EDGE_HTTP": "自动上报",
"EDGE": "自动上报",
"POST": "人工上报",
"MANUAL": "人工上报",
}.get(ext.ext_type, "自动上报")
return "自动上报"
except Exception:
return "自动上报"
finally:
db.close()
def _get_permanent_url(snapshot_url: str) -> str:
"""将 COS object key 转为永久访问 URL"""
if not snapshot_url:
return ""
if snapshot_url.startswith("http"):
return snapshot_url
try:
from app.services.oss_storage import get_oss_storage
return get_oss_storage().get_permanent_url(snapshot_url)
except Exception as e:
logger.warning(f"生成永久URL失败: {e}")
return ""
def _save_order_id(alarm_id: str, order_id: str):
"""将工单ID保存到 wechat_card_state + alarm_event_ext"""
db = get_session()
try:
# 写 wechat_card_stateorder_id ↔ alarm_id 映射)
from app.models import WechatCardState
card_state = WechatCardState(
order_id=str(order_id),
alarm_id=alarm_id,
)
db.merge(card_state)
# 写 alarm_event_ext保持兼容
ext = AlarmEventExt(
alarm_id=alarm_id,
ext_type="WORK_ORDER",
ext_data={"order_id": str(order_id)},
)
db.add(ext)
db.commit()
logger.info(f"工单ID已关联: alarm={alarm_id}, order={order_id}")
except Exception as e:
db.rollback()
logger.error(f"保存工单ID失败: {e}")
finally:
db.close()
def _set_alarm_confirmed(alarm_id: str):
"""工单创建后,将告警状态改为处理中"""
db = get_session()
try:
alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first()
if alarm and alarm.alarm_status == "NEW":
alarm.alarm_status = "CONFIRMED"
alarm.handle_status = "HANDLING"
alarm.handle_remark = "已创建工单"
db.commit()
except Exception as e:
db.rollback()
logger.error(f"更新告警状态失败: {e}")
finally:
db.close()