优化:日报增加按边缘节点分组汇总(各节点告警数、类型分布、Top设备)

This commit is contained in:
2026-03-25 10:07:55 +08:00
parent 50f016e9fb
commit 5a7b098663

View File

@@ -2,11 +2,12 @@
每日告警日报定时推送服务
每天定时生成前一天的告警汇总,发送到企微群聊。
支持按边缘节点分组统计。
"""
import asyncio
from datetime import datetime, timedelta
from collections import Counter
from typing import Dict, List, Optional, Tuple
from collections import Counter, defaultdict
from datetime import timedelta
from typing import Dict, List, Optional
from app.utils.logger import logger
from app.config import settings
@@ -22,8 +23,29 @@ ALARM_TYPE_NAMES = {
WEEKDAY_NAMES = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
def _get_edge_name_map() -> Dict[str, str]:
"""从 EdgeDevice 表获取 edge_node_id → device_name 映射"""
from app.models import get_session, EdgeDevice
db = get_session()
try:
devices = db.query(EdgeDevice.device_id, EdgeDevice.device_name).all()
return {d.device_id: (d.device_name or d.device_id) for d in devices}
except Exception as e:
logger.warning(f"查询边缘设备名称失败: {e}")
return {}
finally:
db.close()
def _format_resp_time(minutes: float) -> str:
"""格式化响应时长"""
if minutes < 60:
return f"{minutes:.1f}分钟"
return f"{minutes / 60:.1f}小时"
async def generate_daily_report() -> Optional[str]:
"""生成昨日告警日报 Markdown 内容"""
"""生成昨日告警日报 Markdown 内容(含按边缘节点分组)"""
from app.services.alarm_event_service import get_alarm_event_service
from app.services.camera_name_service import get_camera_name_service
from app.utils.timezone import beijing_now
@@ -36,7 +58,7 @@ async def generate_daily_report() -> Optional[str]:
yesterday_start = today_start - timedelta(days=1)
day_before_start = today_start - timedelta(days=2)
# 查询昨日和前日全量告警page_size 设大,拿全量)
# 查询昨日和前日全量告警
yesterday_alarms, yesterday_total = svc.get_alarms(
start_time=yesterday_start, end_time=today_start, page=1, page_size=10000
)
@@ -44,17 +66,18 @@ async def generate_daily_report() -> Optional[str]:
start_time=day_before_start, end_time=yesterday_start, page=1, page_size=1
)
date_str = yesterday_start.strftime("%m-%d")
weekday = WEEKDAY_NAMES[yesterday_start.weekday()]
# 无告警时发送简短通知
if yesterday_total == 0:
date_str = yesterday_start.strftime("%m-%d")
weekday = WEEKDAY_NAMES[yesterday_start.weekday()]
return (
f"**AI安防日报 — {date_str}{weekday}**\n\n"
f">昨日告警总计:**0** 条\n"
f">系统运行正常,无异常事件"
)
# ---- 统计 ----
# ---- 全局统计 ----
type_counter: Counter = Counter()
device_counter: Counter = Counter()
handle_done = 0
@@ -63,11 +86,17 @@ async def generate_daily_report() -> Optional[str]:
false_alarm = 0
response_times: List[float] = []
# ---- 按边缘节点分组 ----
edge_groups: Dict[str, List] = defaultdict(list)
for a in yesterday_alarms:
type_counter[a.alarm_type] += 1
device_counter[a.device_id] += 1
# 处理状态统计
# 按 edge_node_id 分组(无 edge_node_id 归入 "unknown"
edge_key = a.edge_node_id or "unknown"
edge_groups[edge_key].append(a)
if a.handle_status == "DONE":
handle_done += 1
elif a.handle_status == "IGNORED":
@@ -75,14 +104,12 @@ async def generate_daily_report() -> Optional[str]:
else:
handle_unhandled += 1
# 误报统计
if a.alarm_status == "FALSE":
false_alarm += 1
# 响应时长(从 event_time 到 handled_at
if a.handled_at and a.event_time:
delta = (a.handled_at - a.event_time).total_seconds() / 60.0
if delta >= 0:
if 0 <= delta <= 360: # 排除 >6h 异常值
response_times.append(delta)
# 环比变化
@@ -98,28 +125,28 @@ async def generate_daily_report() -> Optional[str]:
change_str = "前日无告警"
# 平均响应时长
if response_times:
avg_resp = sum(response_times) / len(response_times)
if avg_resp < 60:
resp_str = f"{avg_resp:.1f}分钟"
else:
resp_str = f"{avg_resp / 60:.1f}小时"
else:
resp_str = "暂无数据"
resp_str = _format_resp_time(sum(response_times) / len(response_times)) if response_times else "暂无数据"
# 设备 Top5 — 批量获取摄像头名称
top5_devices = device_counter.most_common(5)
device_ids = [d[0] for d in top5_devices]
all_device_ids = list({d[0] for d in top5_devices})
# 也收集各边缘节点的 top 设备 ID
for edge_key, alarms in edge_groups.items():
edge_dev_counter = Counter(a.device_id for a in alarms)
for dev_id, _ in edge_dev_counter.most_common(3):
if dev_id not in all_device_ids:
all_device_ids.append(dev_id)
try:
name_map = await camera_svc.get_display_names_batch(device_ids)
name_map = await camera_svc.get_display_names_batch(all_device_ids)
except Exception as e:
logger.warning(f"日报获取摄像头名称失败: {e}")
name_map = {}
# ---- 组装 Markdown ----
date_str = yesterday_start.strftime("%m-%d")
weekday = WEEKDAY_NAMES[yesterday_start.weekday()]
# 边缘设备名称映射
edge_name_map = _get_edge_name_map()
# ==================== 组装 Markdown ====================
lines = [
f"**AI安防日报 — {date_str}{weekday}**",
"",
@@ -145,6 +172,46 @@ async def generate_daily_report() -> Optional[str]:
display_name = name_map.get(device_id, device_id)
lines.append(f">{i}. {display_name}{count}")
# ---- 按边缘节点分组汇总 ----
if len(edge_groups) > 1 or (len(edge_groups) == 1 and "unknown" not in edge_groups):
lines.append("")
lines.append("**各边缘节点汇总**")
# 按告警数降序排列
sorted_edges = sorted(edge_groups.items(), key=lambda x: len(x[1]), reverse=True)
for edge_key, alarms in sorted_edges:
edge_name = edge_name_map.get(edge_key, edge_key)
if edge_key == "unknown":
edge_name = "未知节点"
edge_total = len(alarms)
edge_type_counter = Counter(a.alarm_type for a in alarms)
edge_unhandled = sum(1 for a in alarms if a.handle_status not in ("DONE", "IGNORED"))
edge_false = sum(1 for a in alarms if a.alarm_status == "FALSE")
# 类型分布简写
type_parts = []
for at, ac in edge_type_counter.most_common():
type_parts.append(f"{ALARM_TYPE_NAMES.get(at, at)}{ac}")
type_summary = "".join(type_parts)
# 该节点的 Top3 摄像头
edge_dev_counter = Counter(a.device_id for a in alarms)
top3 = edge_dev_counter.most_common(3)
cam_parts = []
for dev_id, dev_count in top3:
cam_name = name_map.get(dev_id, dev_id)
cam_parts.append(f"{cam_name}({dev_count})")
cam_summary = "".join(cam_parts)
lines.append(f"")
lines.append(f">{edge_name}<font color=\"warning\">{edge_total}</font> 条")
lines.append(f"> 类型:{type_summary}")
if edge_unhandled > 0:
lines.append(f"> 待处理:<font color=\"warning\">{edge_unhandled}</font> 条 误报:{edge_false}")
lines.append(f"> Top设备{cam_summary}")
return "\n".join(lines)