Part A: 数据层
- 新增 WechatCardState 模型(order_id ↔ alarm_id 映射 + response_code)
- 新建 models_iot.py(IoT 工单只读 ORM:ops_order + security_ext + clean_ext)
- config.py 新增 IOT_DATABASE_URL 配置
Part B: 企微解耦(alarm_id → order_id)
- wechat_service: response_code 存储迁移到 wechat_card_state,集中 helper
- 卡片发送/更新方法改用 order_id,按钮 key: confirm_{order_id}
- wechat_callback: 按钮解析改 order_id,反查 alarm_id(可空)
- wechat_notify_api: send-card/sync-status 以 orderId 为主键
- yudao_aiot_alarm: 卡片操作改用 order_id,删重复 helper
Part C: Agent 工具全面改为工单驱动
- 新建 order_query.py(查 IoT ops_order,支持安保+保洁工单)
- 新建 order_action.py(操作工单状态 + 提交处理结果)
- 更新 prompts.py 为工单助手
- 更新工具注册(__init__.py)
Part D: 日报改为工单驱动
- daily_report_service 从查 alarm_event 改为查 IoT ops_order + 扩展表
- 支持安保+保洁工单统计
449 lines
16 KiB
Python
449 lines
16 KiB
Python
"""
|
||
通知调度服务
|
||
|
||
告警创建后的异步处理流水线:
|
||
1. VLM 复核截图 → 写入 alarm_llm_analysis
|
||
2. 查表获取通知人员 (camera → area → person)
|
||
3. 创建 IoT 工单 → 告警状态改为 CONFIRMED
|
||
|
||
企微通知由 IoT 工单回调驱动,不在此处发送。
|
||
全程异步执行,不阻塞告警接收主流程。
|
||
VLM 不可用时自动降级,不影响系统运行。
|
||
"""
|
||
|
||
import os
|
||
from typing import Dict
|
||
|
||
from app.models import (
|
||
get_session,
|
||
AlarmEvent, AlarmLlmAnalysis, AlarmEventExt,
|
||
CameraAreaBinding, AreaPersonBinding, NotifyArea,
|
||
)
|
||
from app.config import settings
|
||
from app.services.vlm_service import get_vlm_service
|
||
from app.services.wechat_service import ALARM_TYPE_NAMES
|
||
from app.services.camera_name_service import get_camera_name_service
|
||
from app.services.work_order_client import get_work_order_client
|
||
from app.utils.logger import logger
|
||
from app.utils.timezone import beijing_now
|
||
|
||
|
||
async def process_alarm_notification(alarm_data: Dict):
|
||
"""
|
||
告警通知处理流水线(异步,fire-and-forget)
|
||
|
||
Args:
|
||
alarm_data: 告警字典,包含 alarm_id, alarm_type, device_id,
|
||
snapshot_url, alarm_level, event_time 等字段
|
||
"""
|
||
alarm_id = alarm_data.get("alarm_id", "")
|
||
alarm_type = alarm_data.get("alarm_type", "")
|
||
device_id = alarm_data.get("device_id", "")
|
||
snapshot_url = alarm_data.get("snapshot_url", "")
|
||
alarm_level = alarm_data.get("alarm_level", 2)
|
||
event_time = alarm_data.get("event_time", "")
|
||
area_id = alarm_data.get("area_id")
|
||
|
||
skip_vlm = alarm_data.get("skip_vlm", False)
|
||
logger.info(f"开始处理告警通知: {alarm_id}" + (" (跳过VLM)" if skip_vlm else ""))
|
||
|
||
try:
|
||
scene_id = alarm_data.get("scene_id", "")
|
||
|
||
# 查询摄像头名称(从 WVP 获取,优先 cameraName)
|
||
camera_name_service = get_camera_name_service()
|
||
camera_info = await camera_name_service.get_camera_info(device_id)
|
||
camera_name = camera_name_service.format_display_name(device_id, camera_info)
|
||
|
||
# 查找区域名称:优先从 IoT 平台查 area_id,降级到通知三表
|
||
area_name_for_vlm = await _get_area_name_from_iot(area_id) if area_id else ""
|
||
if not area_name_for_vlm:
|
||
area_name_for_vlm, _, _ = _get_notify_persons(device_id, alarm_level)
|
||
roi_name = area_name_for_vlm if area_name_for_vlm != "未知区域" else scene_id
|
||
|
||
# snapshot_url 可能是 COS object key,需转为可访问的预签名URL
|
||
presigned_snapshot_url = _get_presigned_url(snapshot_url)
|
||
|
||
description = ""
|
||
if not skip_vlm:
|
||
# ========== 1. VLM 复核 ==========
|
||
vlm_service = get_vlm_service()
|
||
|
||
vlm_result = await vlm_service.verify_alarm(
|
||
snapshot_url=presigned_snapshot_url,
|
||
alarm_type=alarm_type,
|
||
camera_name=camera_name,
|
||
roi_name=roi_name,
|
||
)
|
||
|
||
# 写入 alarm_llm_analysis 表
|
||
_save_vlm_result(alarm_id, vlm_result)
|
||
|
||
# VLM 明确判定为误报(非降级跳过)→ 更新告警状态,不通知
|
||
if not vlm_result.get("confirmed", True) and not vlm_result.get("skipped"):
|
||
_mark_false_alarm(alarm_id)
|
||
logger.info(f"VLM 判定误报,跳过通知: {alarm_id}")
|
||
return
|
||
|
||
# VLM 跳过(降级)且 confirmed=False → 不改告警状态,仅跳过通知
|
||
if not vlm_result.get("confirmed", True) and vlm_result.get("skipped"):
|
||
logger.info(f"VLM 降级跳过,不推送但保留告警状态: {alarm_id}")
|
||
return
|
||
|
||
description = vlm_result.get("description", "")
|
||
else:
|
||
logger.info(f"跳过 VLM 复核: {alarm_id}")
|
||
|
||
# ========== 2. 查表获取通知人员 ==========
|
||
area_name = area_name_for_vlm
|
||
_, area_id_int, persons = _get_notify_persons(device_id, alarm_level)
|
||
|
||
# 演示模式:数据库无人员时,使用配置的测试 userid
|
||
if not persons and settings.wechat.test_uids:
|
||
test_uids = [uid.strip() for uid in settings.wechat.test_uids.split(",") if uid.strip()]
|
||
if test_uids:
|
||
persons = [{"person_name": "测试用户", "wechat_uid": uid, "role": "TEST"} for uid in test_uids]
|
||
if not area_name or area_name == "未知区域":
|
||
area_name = "演示区域"
|
||
logger.info(f"演示模式: 使用测试用户 {test_uids}")
|
||
|
||
if not persons:
|
||
logger.warning(f"未找到通知人员: camera={device_id}, 跳过工单创建")
|
||
|
||
# ========== 3. 创建工单 ==========
|
||
wo_client = get_work_order_client()
|
||
if wo_client.enabled:
|
||
wo_area_id = _get_alarm_area_id(alarm_id) or area_id_int
|
||
if wo_area_id:
|
||
type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
|
||
wo_title = f"{type_name}告警"
|
||
trigger_source = _get_trigger_source(alarm_id)
|
||
permanent_url = _get_permanent_url(snapshot_url)
|
||
order_id = await wo_client.create_order(
|
||
title=wo_title,
|
||
area_id=wo_area_id,
|
||
alarm_id=alarm_id,
|
||
alarm_type=type_name,
|
||
description=description,
|
||
priority=alarm_level,
|
||
trigger_source=trigger_source,
|
||
camera_id=device_id,
|
||
image_url=permanent_url,
|
||
roi_id=scene_id or "",
|
||
source_type="ALARM",
|
||
)
|
||
if order_id:
|
||
_save_order_id(alarm_id, order_id)
|
||
_set_alarm_confirmed(alarm_id)
|
||
logger.info(f"工单已创建,告警已确认: alarm={alarm_id}, order={order_id}")
|
||
else:
|
||
logger.warning(f"告警无 area_id,跳过工单创建: {alarm_id}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"告警通知处理失败: {alarm_id}, error={e}", exc_info=True)
|
||
|
||
|
||
def _get_presigned_url(snapshot_url: str) -> str:
|
||
"""将 COS object key 或永久 URL 转为预签名 URL"""
|
||
if not snapshot_url:
|
||
return ""
|
||
# 如果是完整 COS 永久 URL,提取 object key 后重新生成预签名 URL
|
||
if snapshot_url.startswith("http"):
|
||
object_key = _extract_cos_object_key(snapshot_url)
|
||
if object_key:
|
||
try:
|
||
from app.services.oss_storage import get_oss_storage
|
||
return get_oss_storage().get_presigned_url(object_key)
|
||
except Exception as e:
|
||
logger.warning(f"从COS URL生成预签名失败: {e}")
|
||
# 无法提取 key,返回原 URL(可能是外部 URL)
|
||
return snapshot_url
|
||
try:
|
||
from app.services.oss_storage import get_oss_storage
|
||
return get_oss_storage().get_presigned_url(snapshot_url)
|
||
except Exception as e:
|
||
logger.warning(f"生成预签名URL失败: {e}")
|
||
return snapshot_url
|
||
|
||
|
||
def _extract_cos_object_key(url: str) -> str:
|
||
"""从 COS 永久 URL 中提取 object key
|
||
|
||
支持格式:
|
||
- https://{bucket}.cos.{region}.myqcloud.com/{key}
|
||
- https://cos.{region}.myqcloud.com/{bucket}/{key}
|
||
"""
|
||
try:
|
||
from urllib.parse import urlparse
|
||
parsed = urlparse(url)
|
||
host = parsed.hostname or ""
|
||
path = parsed.path.lstrip("/")
|
||
|
||
if not path or "myqcloud.com" not in host:
|
||
return ""
|
||
|
||
# 格式1: {bucket}.cos.{region}.myqcloud.com/{key}
|
||
if ".cos." in host and host.endswith(".myqcloud.com"):
|
||
return path
|
||
|
||
# 格式2: cos.{region}.myqcloud.com/{bucket}/{key}
|
||
if host.startswith("cos.") and host.endswith(".myqcloud.com"):
|
||
parts = path.split("/", 1)
|
||
if len(parts) == 2:
|
||
return parts[1]
|
||
|
||
return ""
|
||
except Exception:
|
||
return ""
|
||
|
||
|
||
def _save_vlm_result(alarm_id: str, vlm_result: Dict):
|
||
"""将 VLM 复核结果写入 alarm_llm_analysis 表"""
|
||
db = get_session()
|
||
try:
|
||
analysis = AlarmLlmAnalysis(
|
||
alarm_id=alarm_id,
|
||
llm_model=settings.vlm.model,
|
||
analysis_type="REVIEW",
|
||
summary=vlm_result.get("description", ""),
|
||
is_false_alarm=not vlm_result.get("confirmed", True),
|
||
confidence_score=None if vlm_result.get("skipped") else 0.9,
|
||
suggestion="VLM跳过" if vlm_result.get("skipped") else None,
|
||
)
|
||
db.add(analysis)
|
||
db.commit()
|
||
except Exception as e:
|
||
db.rollback()
|
||
logger.error(f"保存 VLM 结果失败: {e}")
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def _mark_false_alarm(alarm_id: str):
|
||
"""将告警标记为误报"""
|
||
db = get_session()
|
||
try:
|
||
alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first()
|
||
if alarm:
|
||
now = beijing_now()
|
||
alarm.alarm_status = "FALSE"
|
||
alarm.handle_status = "IGNORED"
|
||
alarm.handle_remark = "VLM复核判定误报"
|
||
alarm.handled_at = now
|
||
if alarm.duration_ms is None and alarm.event_time:
|
||
try:
|
||
delta = now - alarm.event_time
|
||
alarm.duration_ms = int(delta.total_seconds() * 1000)
|
||
except Exception:
|
||
pass
|
||
if alarm.last_frame_time is None:
|
||
alarm.last_frame_time = now
|
||
db.commit()
|
||
logger.info(f"告警已标记误报: {alarm_id}")
|
||
except Exception as e:
|
||
db.rollback()
|
||
logger.error(f"标记误报失败: {e}")
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def _get_notify_persons(camera_id: str, alarm_level: int) -> tuple:
|
||
"""
|
||
根据摄像头ID查找通知人员
|
||
|
||
Returns:
|
||
(area_name, area_id, [{"person_name": ..., "wechat_uid": ..., "role": ...}])
|
||
"""
|
||
db = get_session()
|
||
try:
|
||
binding = db.query(CameraAreaBinding).filter(
|
||
CameraAreaBinding.camera_id == camera_id
|
||
).first()
|
||
|
||
if not binding:
|
||
return ("未知区域", 0, [])
|
||
|
||
area = db.query(NotifyArea).filter(
|
||
NotifyArea.area_id == binding.area_id,
|
||
NotifyArea.enabled == 1,
|
||
).first()
|
||
|
||
area_name = area.area_name if area else "未知区域"
|
||
area_id = binding.area_id or 0
|
||
|
||
persons = db.query(AreaPersonBinding).filter(
|
||
AreaPersonBinding.area_id == binding.area_id,
|
||
AreaPersonBinding.enabled == 1,
|
||
AreaPersonBinding.notify_level >= alarm_level,
|
||
).all()
|
||
|
||
result = [
|
||
{
|
||
"person_name": p.person_name,
|
||
"wechat_uid": p.wechat_uid,
|
||
"role": p.role,
|
||
}
|
||
for p in persons
|
||
]
|
||
|
||
return (area_name, area_id, result)
|
||
|
||
except Exception as e:
|
||
logger.error(f"查询通知人员失败: {e}")
|
||
return ("未知区域", 0, [])
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
# 区域名称缓存(area_id → (area_name, expire_time)),5 分钟 TTL
|
||
_area_name_cache: Dict[int, tuple] = {}
|
||
_iot_token_cache: Dict[str, str] = {"token": "", "expire": 0}
|
||
|
||
_AREA_CACHE_TTL = 300 # 5 分钟
|
||
|
||
|
||
async def _get_area_name_from_iot(area_id: int) -> str:
|
||
"""从 IoT 平台查询区域名称(带 TTL 缓存)"""
|
||
if not area_id:
|
||
return ""
|
||
cached = _area_name_cache.get(area_id)
|
||
if cached and cached[1] > __import__('time').time():
|
||
return cached[0]
|
||
try:
|
||
import httpx
|
||
import time
|
||
# IoT 平台地址(区域查询),优先级:IOT_PLATFORM_URL > WORK_ORDER_BASE_URL
|
||
base_url = (
|
||
os.getenv("IOT_PLATFORM_URL", "")
|
||
or settings.work_order.base_url
|
||
)
|
||
if not base_url:
|
||
return ""
|
||
|
||
# Token 缓存(有效期内复用)
|
||
now = time.time()
|
||
if not _iot_token_cache["token"] or now > _iot_token_cache["expire"]:
|
||
async with httpx.AsyncClient(timeout=5) as client:
|
||
login_resp = await client.post(
|
||
f"{base_url}/admin-api/system/auth/login",
|
||
json={"username": "admin", "password": "admin123", "tenantName": "默认"},
|
||
headers={"tenant-id": "1"},
|
||
)
|
||
login_data = login_resp.json().get("data", {})
|
||
_iot_token_cache["token"] = login_data.get("accessToken", "")
|
||
# token 有效期约 30 分钟,这里缓存 20 分钟
|
||
_iot_token_cache["expire"] = now + 1200
|
||
|
||
token = _iot_token_cache["token"]
|
||
if not token:
|
||
return ""
|
||
|
||
async with httpx.AsyncClient(timeout=5) as client:
|
||
resp = await client.get(
|
||
f"{base_url}/admin-api/ops/area/get",
|
||
params={"id": area_id},
|
||
headers={"tenant-id": "1", "Authorization": f"Bearer {token}"},
|
||
)
|
||
data = resp.json()
|
||
if data.get("code") == 0 and data.get("data"):
|
||
name = data["data"].get("areaName", "")
|
||
if name:
|
||
_area_name_cache[area_id] = (name, __import__('time').time() + _AREA_CACHE_TTL)
|
||
return name
|
||
except Exception as e:
|
||
logger.warning(f"查询IoT平台区域名失败: area_id={area_id}, error={e}")
|
||
return ""
|
||
|
||
|
||
def _get_alarm_area_id(alarm_id: str) -> int:
|
||
"""从 alarm_event 表获取 area_id"""
|
||
db = get_session()
|
||
try:
|
||
alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first()
|
||
return alarm.area_id if alarm and alarm.area_id else 0
|
||
except Exception:
|
||
return 0
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def _get_trigger_source(alarm_id: str) -> str:
|
||
"""从 alarm_event_ext 的 ext_type 判断告警来源"""
|
||
db = get_session()
|
||
try:
|
||
ext = db.query(AlarmEventExt).filter(
|
||
AlarmEventExt.alarm_id == alarm_id,
|
||
AlarmEventExt.ext_type.in_(["EDGE_HTTP", "EDGE", "POST", "MANUAL"]),
|
||
).first()
|
||
if ext:
|
||
return {
|
||
"EDGE_HTTP": "自动上报",
|
||
"EDGE": "自动上报",
|
||
"POST": "人工上报",
|
||
"MANUAL": "人工上报",
|
||
}.get(ext.ext_type, "自动上报")
|
||
return "自动上报"
|
||
except Exception:
|
||
return "自动上报"
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def _get_permanent_url(snapshot_url: str) -> str:
|
||
"""将 COS object key 转为永久访问 URL"""
|
||
if not snapshot_url:
|
||
return ""
|
||
if snapshot_url.startswith("http"):
|
||
return snapshot_url
|
||
try:
|
||
from app.services.oss_storage import get_oss_storage
|
||
return get_oss_storage().get_permanent_url(snapshot_url)
|
||
except Exception as e:
|
||
logger.warning(f"生成永久URL失败: {e}")
|
||
return ""
|
||
|
||
|
||
def _save_order_id(alarm_id: str, order_id: str):
|
||
"""将工单ID保存到 wechat_card_state + alarm_event_ext"""
|
||
db = get_session()
|
||
try:
|
||
# 写 wechat_card_state(order_id ↔ alarm_id 映射)
|
||
from app.models import WechatCardState
|
||
card_state = WechatCardState(
|
||
order_id=str(order_id),
|
||
alarm_id=alarm_id,
|
||
)
|
||
db.merge(card_state)
|
||
|
||
# 写 alarm_event_ext(保持兼容)
|
||
ext = AlarmEventExt(
|
||
alarm_id=alarm_id,
|
||
ext_type="WORK_ORDER",
|
||
ext_data={"order_id": str(order_id)},
|
||
)
|
||
db.add(ext)
|
||
db.commit()
|
||
logger.info(f"工单ID已关联: alarm={alarm_id}, order={order_id}")
|
||
except Exception as e:
|
||
db.rollback()
|
||
logger.error(f"保存工单ID失败: {e}")
|
||
finally:
|
||
db.close()
|
||
|
||
|
||
def _set_alarm_confirmed(alarm_id: str):
|
||
"""工单创建后,将告警状态改为处理中"""
|
||
db = get_session()
|
||
try:
|
||
alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first()
|
||
if alarm and alarm.alarm_status == "NEW":
|
||
alarm.alarm_status = "CONFIRMED"
|
||
alarm.handle_status = "HANDLING"
|
||
alarm.handle_remark = "已创建工单"
|
||
db.commit()
|
||
except Exception as e:
|
||
db.rollback()
|
||
logger.error(f"更新告警状态失败: {e}")
|
||
finally:
|
||
db.close()
|