Files
iot-device-management-service/app/services/notify_dispatch.py
16337 f7a69892f6 fix: 注释工单对接代码,修复API响应解析和类型问题
- 工单创建/自动结单代码全部注释,待本地测试后启用
- 修复create_order响应解析:data直接是orderId,非嵌套对象
- 修复areaId类型:int(文档要求),非str
- 修复auto-complete orderId类型:int
- 两步卡片状态机和先到先得逻辑保留生效

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-12 18:20:30 +08:00

294 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
通知调度服务
告警创建后的异步处理流水线:
1. VLM 复核截图 → 写入 alarm_llm_analysis
2. 查表获取通知人员 (camera → area → person)
3. 推送企微通知:
- 群聊image + news + @text 组合消息
- 个人button_interaction 模板卡片
全程异步执行,不阻塞告警接收主流程。
VLM 或企微不可用时自动降级,不影响系统运行。
"""
from datetime import datetime
from typing import Dict, List
from app.models import (
get_session,
AlarmEvent, AlarmLlmAnalysis, AlarmEventExt,
CameraAreaBinding, AreaPersonBinding, NotifyArea,
)
from app.config import settings
from app.services.vlm_service import get_vlm_service
from app.services.wechat_service import get_wechat_service, ALARM_TYPE_NAMES, ALARM_LEVEL_NAMES
from app.services.work_order_client import get_work_order_client
from app.utils.logger import logger
from app.utils.timezone import beijing_now
async def process_alarm_notification(alarm_data: Dict):
"""
告警通知处理流水线异步fire-and-forget
Args:
alarm_data: 告警字典,包含 alarm_id, alarm_type, device_id,
snapshot_url, alarm_level, event_time 等字段
"""
alarm_id = alarm_data.get("alarm_id", "")
alarm_type = alarm_data.get("alarm_type", "")
device_id = alarm_data.get("device_id", "")
snapshot_url = alarm_data.get("snapshot_url", "")
alarm_level = alarm_data.get("alarm_level", 2)
event_time = alarm_data.get("event_time", "")
logger.info(f"开始处理告警通知: {alarm_id}")
try:
# ========== 1. VLM 复核 ==========
vlm_service = get_vlm_service()
camera_name = alarm_data.get("camera_name", device_id)
scene_id = alarm_data.get("scene_id", "")
# 查找区域名称用于 VLM prompt比 UUID 更有意义)
area_name_for_vlm, _, _ = _get_notify_persons(device_id, alarm_level)
roi_name = area_name_for_vlm if area_name_for_vlm != "未知区域" else scene_id
# snapshot_url 可能是 COS object key需转为可访问的预签名URL
presigned_snapshot_url = _get_presigned_url(snapshot_url)
vlm_result = await vlm_service.verify_alarm(
snapshot_url=presigned_snapshot_url,
alarm_type=alarm_type,
camera_name=camera_name,
roi_name=roi_name,
)
# 写入 alarm_llm_analysis 表
_save_vlm_result(alarm_id, vlm_result)
# VLM 明确判定为误报(非降级跳过)→ 更新告警状态,不通知
if not vlm_result.get("confirmed", True) and not vlm_result.get("skipped"):
_mark_false_alarm(alarm_id)
logger.info(f"VLM 判定误报,跳过通知: {alarm_id}")
return
# VLM 跳过(降级)且 confirmed=False → 不改告警状态,仅跳过通知
if not vlm_result.get("confirmed", True) and vlm_result.get("skipped"):
logger.info(f"VLM 降级跳过,不推送但保留告警状态: {alarm_id}")
return
# ========== 2. 查表获取通知人员 ==========
description = vlm_result.get("description", "")
area_name = area_name_for_vlm
_, area_id_int, persons = _get_notify_persons(device_id, alarm_level)
# 演示模式:数据库无人员时,使用配置的测试 userid
if not persons and settings.wechat.test_uids:
test_uids = [uid.strip() for uid in settings.wechat.test_uids.split(",") if uid.strip()]
if test_uids:
persons = [{"person_name": "测试用户", "wechat_uid": uid, "role": "TEST"} for uid in test_uids]
area_name = "演示区域"
logger.info(f"演示模式: 使用测试用户 {test_uids}")
if not persons:
logger.warning(f"未找到通知人员: camera={device_id}, 跳过企微推送")
return
# ========== 3. 推送企微通知 ==========
wechat_service = get_wechat_service()
if not wechat_service.enabled:
logger.info(f"企微未启用,跳过通知: {alarm_id}")
return
event_time_str = (
event_time.strftime("%Y-%m-%d %H:%M:%S")
if isinstance(event_time, datetime) else str(event_time or "")
)
user_ids = [p["wechat_uid"] for p in persons]
group_chat_id = settings.wechat.group_chat_id
# ---- 3a. 群聊组合消息 ----
if group_chat_id:
await wechat_service.send_group_alarm_combo(
chat_id=group_chat_id,
alarm_id=alarm_id,
alarm_type=alarm_type,
area_name=area_name,
camera_name=camera_name,
description=description or f"{area_name} 检测到告警",
event_time=event_time_str,
alarm_level=alarm_level,
snapshot_url=presigned_snapshot_url,
mention_user_ids=user_ids,
)
else:
logger.debug("未配置群聊ID跳过群聊推送")
# ---- 3b. 个人按钮交互卡片 ----
sent = await wechat_service.send_alarm_card(
user_ids=user_ids,
alarm_id=alarm_id,
alarm_type=alarm_type,
area_name=area_name,
camera_name=camera_name,
description=description,
event_time=event_time_str,
alarm_level=alarm_level,
)
if sent:
logger.info(f"告警通知完成: {alarm_id}")
else:
logger.warning(f"个人卡片发送失败: {alarm_id}")
# ---- 4. 创建安保工单(暂未上线,待本地测试通过后启用) ----
# wo_client = get_work_order_client()
# if wo_client.enabled:
# type_name = ALARM_TYPE_NAMES.get(alarm_type, alarm_type)
# level_name = ALARM_LEVEL_NAMES.get(alarm_level, "一般")
# wo_title = f"【{level_name}】{type_name}告警 - {area_name}"
# order_id = await wo_client.create_order(
# title=wo_title,
# area_id=area_id_int,
# alarm_id=alarm_id,
# alarm_type=alarm_type,
# )
# if order_id:
# _save_order_id(alarm_id, order_id)
except Exception as e:
logger.error(f"告警通知处理失败: {alarm_id}, error={e}", exc_info=True)
def _get_presigned_url(snapshot_url: str) -> str:
"""将 COS object key 转为预签名 URL"""
if not snapshot_url:
return ""
if snapshot_url.startswith("http"):
return snapshot_url
try:
from app.services.oss_storage import get_oss_storage
return get_oss_storage().get_presigned_url(snapshot_url)
except Exception as e:
logger.warning(f"生成预签名URL失败: {e}")
return snapshot_url
def _save_vlm_result(alarm_id: str, vlm_result: Dict):
"""将 VLM 复核结果写入 alarm_llm_analysis 表"""
db = get_session()
try:
analysis = AlarmLlmAnalysis(
alarm_id=alarm_id,
llm_model="qwen3-vl-flash",
analysis_type="REVIEW",
summary=vlm_result.get("description", ""),
is_false_alarm=not vlm_result.get("confirmed", True),
confidence_score=None if vlm_result.get("skipped") else 0.9,
suggestion="VLM跳过" if vlm_result.get("skipped") else None,
)
db.add(analysis)
db.commit()
except Exception as e:
db.rollback()
logger.error(f"保存 VLM 结果失败: {e}")
finally:
db.close()
def _mark_false_alarm(alarm_id: str):
"""将告警标记为误报"""
db = get_session()
try:
alarm = db.query(AlarmEvent).filter(AlarmEvent.alarm_id == alarm_id).first()
if alarm:
now = beijing_now()
alarm.alarm_status = "FALSE"
alarm.handle_status = "IGNORED"
alarm.handle_remark = "VLM复核判定误报"
alarm.handled_at = now
if alarm.duration_ms is None and alarm.event_time:
try:
delta = now - alarm.event_time
alarm.duration_ms = int(delta.total_seconds() * 1000)
except Exception:
pass
if alarm.last_frame_time is None:
alarm.last_frame_time = now
db.commit()
logger.info(f"告警已标记误报: {alarm_id}")
except Exception as e:
db.rollback()
logger.error(f"标记误报失败: {e}")
finally:
db.close()
def _get_notify_persons(camera_id: str, alarm_level: int) -> tuple:
"""
根据摄像头ID查找通知人员
Returns:
(area_name, area_id, [{"person_name": ..., "wechat_uid": ..., "role": ...}])
"""
db = get_session()
try:
binding = db.query(CameraAreaBinding).filter(
CameraAreaBinding.camera_id == camera_id
).first()
if not binding:
return ("未知区域", 0, [])
area = db.query(NotifyArea).filter(
NotifyArea.area_id == binding.area_id,
NotifyArea.enabled == 1,
).first()
area_name = area.area_name if area else "未知区域"
area_id = binding.area_id or 0
persons = db.query(AreaPersonBinding).filter(
AreaPersonBinding.area_id == binding.area_id,
AreaPersonBinding.enabled == 1,
AreaPersonBinding.notify_level <= alarm_level,
).all()
result = [
{
"person_name": p.person_name,
"wechat_uid": p.wechat_uid,
"role": p.role,
}
for p in persons
]
return (area_name, area_id, result)
except Exception as e:
logger.error(f"查询通知人员失败: {e}")
return ("未知区域", 0, [])
finally:
db.close()
def _save_order_id(alarm_id: str, order_id: str):
"""将工单ID保存到 alarm_event_extext_type=WORK_ORDER"""
db = get_session()
try:
ext = AlarmEventExt(
alarm_id=alarm_id,
ext_type="WORK_ORDER",
ext_data={"order_id": order_id},
)
db.add(ext)
db.commit()
logger.info(f"工单ID已关联: alarm={alarm_id}, order={order_id}")
except Exception as e:
db.rollback()
logger.error(f"保存工单ID失败: {e}")
finally:
db.close()